mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-06 02:41:48 +00:00
- Add normalize_profile_name() for lowercase canonical IDs and Default alias - Use canonical names in create/delete/rename/export/import/set_active paths - Canonicalize Kanban assignee on create/assign, list filter, and worker spawn - Tests for mixed-case assignees and profile resolution (fixes #18498)
3326 lines
124 KiB
Python
3326 lines
124 KiB
Python
"""SQLite-backed Kanban board for multi-profile, multi-project collaboration.
|
||
|
||
In a fresh install the board lives at ``<root>/kanban.db`` where
|
||
``<root>`` is the **shared Hermes root** (the parent of any active
|
||
profile). Profiles intentionally collapse onto a shared board: it IS
|
||
the cross-profile coordination primitive. A worker spawned with
|
||
``hermes -p <profile>`` joins the same board as the dispatcher that
|
||
claimed the task. The same applies to ``<root>/kanban/workspaces/`` and
|
||
``<root>/kanban/logs/``.
|
||
|
||
**Multiple boards (projects):** users can create additional boards to
|
||
separate unrelated streams of work (e.g. one per project / repo / domain).
|
||
Each board is a directory under ``<root>/kanban/boards/<slug>/`` with
|
||
its own ``kanban.db``, ``workspaces/``, and ``logs/``. All boards share
|
||
the profile's Hermes home but are otherwise isolated: a worker spawned
|
||
for a task on board ``atm10-server`` sees only that board's tasks,
|
||
cannot enumerate other boards, and its dispatcher ticks don't touch
|
||
other boards' DBs.
|
||
|
||
The first (and for single-project users, only) board is ``default``.
|
||
For back-compat its on-disk DB is ``<root>/kanban.db`` (not
|
||
``boards/default/kanban.db``), so installs that predate the boards
|
||
feature keep working with zero migration. See :func:`kanban_db_path`.
|
||
|
||
Board resolution order (highest precedence first, all optional):
|
||
|
||
* ``board=`` argument passed directly to :func:`connect` / :func:`init_db`
|
||
(explicit — used by the CLI ``--board`` flag and the dashboard
|
||
``?board=...`` query param).
|
||
* ``HERMES_KANBAN_BOARD`` env var (used by the dispatcher to pin workers
|
||
to the board their task lives on — workers cannot see other boards).
|
||
* ``HERMES_KANBAN_DB`` env var (pins the DB file path directly — legacy
|
||
override still honoured; highest precedence when the file path itself
|
||
is what the caller wants to force).
|
||
* ``<root>/kanban/current`` — a one-line text file holding the slug of
|
||
the "currently selected" board. Written by ``hermes kanban boards
|
||
switch <slug>``. When absent, the active board is ``default``.
|
||
|
||
In standard installs ``<root>`` is ``~/.hermes``. In Docker / custom
|
||
deployments where ``HERMES_HOME`` points outside ``~/.hermes`` (e.g.
|
||
``/opt/hermes``), ``<root>`` is ``HERMES_HOME``. Legacy env-var
|
||
overrides still work:
|
||
|
||
* ``HERMES_KANBAN_DB`` — pin the database file path directly.
|
||
* ``HERMES_KANBAN_WORKSPACES_ROOT`` — pin the workspaces root directly.
|
||
* ``HERMES_KANBAN_HOME`` — pin the umbrella root that anchors kanban
|
||
paths. Useful for tests and unusual deployments.
|
||
|
||
The dispatcher injects ``HERMES_KANBAN_DB``,
|
||
``HERMES_KANBAN_WORKSPACES_ROOT``, and ``HERMES_KANBAN_BOARD`` into
|
||
worker subprocess env so workers converge on the exact DB the
|
||
dispatcher used to claim their task — even under unusual symlink or
|
||
Docker layouts.
|
||
|
||
Schema is intentionally small: tasks, task_links, task_comments,
|
||
task_events. The ``workspace_kind`` field decouples coordination from git
|
||
worktrees so that research / ops / digital-twin workloads work alongside
|
||
coding workloads. See ``docs/hermes-kanban-v1-spec.pdf`` for the full
|
||
design specification.
|
||
|
||
Concurrency strategy: WAL mode + ``BEGIN IMMEDIATE`` for write
|
||
transactions + compare-and-swap (CAS) updates on ``tasks.status`` and
|
||
``tasks.claim_lock``. SQLite serializes writers via its WAL lock, so at
|
||
most one claimer can win any given task. Losers observe zero affected
|
||
rows and move on -- no retry loops, no distributed-lock machinery.
|
||
The CAS coordination is **per-board** — each board is a separate DB,
|
||
so multi-board installs get the same atomicity guarantees without any
|
||
new locking.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import contextlib
|
||
import json
|
||
import os
|
||
import re
|
||
import secrets
|
||
import sqlite3
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Any, Iterable, Optional
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
VALID_STATUSES = {"triage", "todo", "ready", "running", "blocked", "done", "archived"}
|
||
VALID_WORKSPACE_KINDS = {"scratch", "worktree", "dir"}
|
||
|
||
# A running task's claim is valid for 15 minutes; after that the next
|
||
# dispatcher tick reclaims it. Workers that outlive this window should call
|
||
# ``heartbeat_claim(task_id)`` periodically. In practice most kanban
|
||
# workloads either finish within 15m or set a longer claim explicitly.
|
||
DEFAULT_CLAIM_TTL_SECONDS = 15 * 60
|
||
|
||
|
||
# Worker-context caps so build_worker_context() stays bounded on
|
||
# pathological boards (retry-heavy tasks, comment storms, giant
|
||
# summaries). Values chosen to fit a typical 100k-char LLM prompt with
|
||
# plenty of headroom. Each constant is tuned independently so users
|
||
# who need to relax one don't have to relax all of them.
|
||
_CTX_MAX_PRIOR_ATTEMPTS = 10 # most recent N prior runs shown in full
|
||
_CTX_MAX_COMMENTS = 30 # most recent N comments shown in full
|
||
_CTX_MAX_FIELD_BYTES = 4 * 1024 # 4 KB per summary/error/metadata/result
|
||
_CTX_MAX_BODY_BYTES = 8 * 1024 # 8 KB per task.body (opening post)
|
||
_CTX_MAX_COMMENT_BYTES = 2 * 1024 # 2 KB per comment
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Paths
|
||
# ---------------------------------------------------------------------------
|
||
|
||
DEFAULT_BOARD = "default"
|
||
|
||
# Slug validator: lowercase alphanumerics, digits, hyphens; 1–64 chars.
|
||
# Strict enough to stop traversal (`..`) and embedded path separators, loose
|
||
# enough that kebab-case names like ``atm10-server`` or ``hermes-agent``
|
||
# pass without fuss. Board names with display formatting (spaces, emoji)
|
||
# live in ``board.json``; the slug is just the directory name.
|
||
_BOARD_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9\-_]{0,63}$")
|
||
|
||
|
||
def _normalize_board_slug(slug: Optional[str]) -> Optional[str]:
|
||
"""Lowercase + strip a slug; validate; return ``None`` for empty."""
|
||
if slug is None:
|
||
return None
|
||
s = str(slug).strip().lower()
|
||
if not s:
|
||
return None
|
||
if not _BOARD_SLUG_RE.match(s):
|
||
raise ValueError(
|
||
f"invalid board slug {slug!r}: must be 1-64 chars, lowercase "
|
||
f"alphanumerics / hyphens / underscores, not starting with '-' or '_'"
|
||
)
|
||
return s
|
||
|
||
|
||
def kanban_home() -> Path:
|
||
"""Return the shared Hermes root that anchors the kanban board.
|
||
|
||
Resolution order:
|
||
|
||
1. ``HERMES_KANBAN_HOME`` env var when set and non-empty (explicit
|
||
override for tests and unusual deployments).
|
||
2. ``get_default_hermes_root()``, which already returns ``<root>``
|
||
when ``HERMES_HOME`` is ``<root>/profiles/<name>``, and returns
|
||
``HERMES_HOME`` directly for Docker / custom deployments.
|
||
|
||
The kanban board is shared across profiles **by design** (see the
|
||
module docstring). Resolving the kanban paths through the active
|
||
profile's ``HERMES_HOME`` would silently fork the board per profile,
|
||
which breaks the dispatcher / worker handoff.
|
||
"""
|
||
override = os.environ.get("HERMES_KANBAN_HOME", "").strip()
|
||
if override:
|
||
return Path(override).expanduser()
|
||
from hermes_constants import get_default_hermes_root
|
||
return get_default_hermes_root()
|
||
|
||
|
||
def boards_root() -> Path:
|
||
"""Return ``<root>/kanban/boards`` — the parent of non-default board dirs.
|
||
|
||
``default`` is intentionally NOT under this directory — its DB lives at
|
||
``<root>/kanban.db`` for back-compat with pre-boards installs. This
|
||
function returns the directory where *additional* named boards live,
|
||
used by :func:`list_boards` to enumerate them.
|
||
"""
|
||
return kanban_home() / "kanban" / "boards"
|
||
|
||
|
||
def current_board_path() -> Path:
|
||
"""Return the path to ``<root>/kanban/current``.
|
||
|
||
One-line text file written by ``hermes kanban boards switch <slug>``
|
||
to persist the user's board selection across CLI invocations. Absent
|
||
by default (meaning: active board is ``default``).
|
||
"""
|
||
return kanban_home() / "kanban" / "current"
|
||
|
||
|
||
def get_current_board() -> str:
|
||
"""Return the active board slug, honouring the resolution chain.
|
||
|
||
Order (highest precedence first):
|
||
|
||
1. ``HERMES_KANBAN_BOARD`` env var (set by the dispatcher on worker
|
||
spawn, or manually for ad-hoc overrides).
|
||
2. ``<root>/kanban/current`` on disk (set by ``hermes kanban boards
|
||
switch``).
|
||
3. ``DEFAULT_BOARD`` (``"default"``).
|
||
|
||
A malformed slug at any step falls through to the next layer with a
|
||
best-effort warning — the dispatcher must never crash because a user
|
||
hand-edited a file.
|
||
"""
|
||
env = os.environ.get("HERMES_KANBAN_BOARD", "").strip()
|
||
if env:
|
||
try:
|
||
normed = _normalize_board_slug(env)
|
||
if normed:
|
||
return normed
|
||
except ValueError:
|
||
pass
|
||
try:
|
||
f = current_board_path()
|
||
if f.exists():
|
||
val = f.read_text(encoding="utf-8").strip()
|
||
if val:
|
||
try:
|
||
normed = _normalize_board_slug(val)
|
||
if normed:
|
||
return normed
|
||
except ValueError:
|
||
pass
|
||
except OSError:
|
||
pass
|
||
return DEFAULT_BOARD
|
||
|
||
|
||
def set_current_board(slug: str) -> Path:
|
||
"""Persist ``slug`` as the active board. Returns the file written.
|
||
|
||
Writes ``<root>/kanban/current``. The caller should validate the slug
|
||
exists first (via :func:`board_exists`) — this function does not —
|
||
so that ``hermes kanban boards switch <typo>`` returns an error
|
||
instead of silently pointing at nothing.
|
||
"""
|
||
normed = _normalize_board_slug(slug)
|
||
if not normed:
|
||
raise ValueError("board slug is required")
|
||
path = current_board_path()
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(normed + "\n", encoding="utf-8")
|
||
return path
|
||
|
||
|
||
def clear_current_board() -> None:
|
||
"""Remove ``<root>/kanban/current`` so the active board reverts to ``default``."""
|
||
try:
|
||
current_board_path().unlink()
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
|
||
def board_dir(board: Optional[str] = None) -> Path:
|
||
"""Return the on-disk directory for ``board``.
|
||
|
||
``default`` is ``<root>/kanban/boards/default/`` **for metadata only**
|
||
(board.json + workspaces/ + logs/). Its DB file stays at
|
||
``<root>/kanban.db`` for back-compat — see :func:`kanban_db_path`.
|
||
|
||
All other boards live at ``<root>/kanban/boards/<slug>/`` with
|
||
everything inside that directory including the ``kanban.db``.
|
||
"""
|
||
slug = _normalize_board_slug(board) or DEFAULT_BOARD
|
||
return boards_root() / slug
|
||
|
||
|
||
def board_exists(board: Optional[str] = None) -> bool:
|
||
"""Return True if the board has a DB or a metadata dir on disk.
|
||
|
||
``default`` is considered to always exist — its DB is created
|
||
on first :func:`connect` and there's no way for it to be missing
|
||
in a configuration where the kanban feature is usable at all.
|
||
"""
|
||
slug = _normalize_board_slug(board) or DEFAULT_BOARD
|
||
if slug == DEFAULT_BOARD:
|
||
return True
|
||
d = board_dir(slug)
|
||
return d.is_dir() or (d / "kanban.db").exists()
|
||
|
||
|
||
def kanban_db_path(board: Optional[str] = None) -> Path:
|
||
"""Return the path to the ``kanban.db`` for ``board``.
|
||
|
||
Resolution (highest precedence first):
|
||
|
||
1. ``HERMES_KANBAN_DB`` env var — pins the path directly. Honoured for
|
||
back-compat and for the dispatcher→worker handoff (defense in
|
||
depth: dispatcher injects this into worker env so workers are
|
||
immune to any path-resolution disagreement).
|
||
2. When ``board`` arg is None, the active board from
|
||
:func:`get_current_board` is used.
|
||
3. Board ``default`` → ``<root>/kanban.db`` (back-compat path).
|
||
Other boards → ``<root>/kanban/boards/<slug>/kanban.db``.
|
||
"""
|
||
override = os.environ.get("HERMES_KANBAN_DB", "").strip()
|
||
if override:
|
||
return Path(override).expanduser()
|
||
slug = _normalize_board_slug(board)
|
||
if slug is None:
|
||
slug = get_current_board()
|
||
if slug == DEFAULT_BOARD:
|
||
return kanban_home() / "kanban.db"
|
||
return board_dir(slug) / "kanban.db"
|
||
|
||
|
||
def workspaces_root(board: Optional[str] = None) -> Path:
|
||
"""Return the directory under which ``scratch`` workspaces are created.
|
||
|
||
Anchored per-board so workspaces don't leak between projects.
|
||
``HERMES_KANBAN_WORKSPACES_ROOT`` pins the path directly (highest
|
||
precedence) — the dispatcher injects this into worker env.
|
||
|
||
``default`` keeps the legacy path ``<root>/kanban/workspaces/`` so
|
||
that existing scratch workspaces from before the boards feature are
|
||
preserved. Other boards use ``<root>/kanban/boards/<slug>/workspaces/``.
|
||
"""
|
||
override = os.environ.get("HERMES_KANBAN_WORKSPACES_ROOT", "").strip()
|
||
if override:
|
||
return Path(override).expanduser()
|
||
slug = _normalize_board_slug(board)
|
||
if slug is None:
|
||
slug = get_current_board()
|
||
if slug == DEFAULT_BOARD:
|
||
return kanban_home() / "kanban" / "workspaces"
|
||
return board_dir(slug) / "workspaces"
|
||
|
||
|
||
def worker_logs_dir(board: Optional[str] = None) -> Path:
|
||
"""Return the directory under which per-task worker logs are written.
|
||
|
||
``default`` keeps the legacy path ``<root>/kanban/logs/``. Other
|
||
boards use ``<root>/kanban/boards/<slug>/logs/``. Logs follow the
|
||
board — makes ``hermes kanban log`` unambiguous even when multiple
|
||
boards have tasks with the same id.
|
||
"""
|
||
slug = _normalize_board_slug(board)
|
||
if slug is None:
|
||
slug = get_current_board()
|
||
if slug == DEFAULT_BOARD:
|
||
return kanban_home() / "kanban" / "logs"
|
||
return board_dir(slug) / "logs"
|
||
|
||
|
||
def board_metadata_path(board: Optional[str] = None) -> Path:
|
||
"""Return the path to ``board.json`` for ``board``.
|
||
|
||
Stores display metadata (display name, description, icon, color,
|
||
created_at). The on-disk slug is the canonical identity; this file
|
||
is purely for presentation in the CLI / dashboard.
|
||
"""
|
||
slug = _normalize_board_slug(board) or DEFAULT_BOARD
|
||
return board_dir(slug) / "board.json"
|
||
|
||
|
||
def _default_board_display_name(slug: str) -> str:
|
||
"""Turn a slug into a reasonable default display name.
|
||
|
||
``atm10-server`` → ``Atm10 Server``. Users can override via
|
||
``board.json`` but the default should look presentable in the
|
||
dashboard without any follow-up editing.
|
||
"""
|
||
return " ".join(part.capitalize() for part in slug.replace("_", "-").split("-") if part) or slug
|
||
|
||
|
||
def read_board_metadata(board: Optional[str] = None) -> dict:
|
||
"""Return ``board.json`` contents (or synthesized defaults).
|
||
|
||
Never raises — a missing / malformed ``board.json`` falls back to a
|
||
synthesised entry so the dashboard always has something to render.
|
||
Includes the canonical ``slug`` and ``db_path`` so the caller
|
||
doesn't need to reconstruct them.
|
||
"""
|
||
slug = _normalize_board_slug(board) or DEFAULT_BOARD
|
||
meta: dict[str, Any] = {
|
||
"slug": slug,
|
||
"name": _default_board_display_name(slug),
|
||
"description": "",
|
||
"icon": "",
|
||
"color": "",
|
||
"created_at": None,
|
||
"archived": False,
|
||
}
|
||
try:
|
||
p = board_metadata_path(slug)
|
||
if p.exists():
|
||
raw = json.loads(p.read_text(encoding="utf-8"))
|
||
if isinstance(raw, dict):
|
||
# Never let the metadata file claim a different slug than
|
||
# its directory — trust the filesystem.
|
||
raw["slug"] = slug
|
||
meta.update(raw)
|
||
except (OSError, json.JSONDecodeError):
|
||
pass
|
||
meta["db_path"] = str(kanban_db_path(slug))
|
||
return meta
|
||
|
||
|
||
def write_board_metadata(
|
||
board: Optional[str],
|
||
*,
|
||
name: Optional[str] = None,
|
||
description: Optional[str] = None,
|
||
icon: Optional[str] = None,
|
||
color: Optional[str] = None,
|
||
archived: Optional[bool] = None,
|
||
) -> dict:
|
||
"""Create / update ``board.json`` for ``board``.
|
||
|
||
Preserves any existing fields not mentioned in the call. Sets
|
||
``created_at`` on first write. Returns the resulting metadata dict.
|
||
"""
|
||
slug = _normalize_board_slug(board) or DEFAULT_BOARD
|
||
meta = read_board_metadata(slug)
|
||
# Preserve existing DB-derived fields — they get re-computed each
|
||
# read but shouldn't be written into board.json.
|
||
meta.pop("db_path", None)
|
||
if name is not None:
|
||
meta["name"] = str(name).strip() or _default_board_display_name(slug)
|
||
if description is not None:
|
||
meta["description"] = str(description)
|
||
if icon is not None:
|
||
meta["icon"] = str(icon)
|
||
if color is not None:
|
||
meta["color"] = str(color)
|
||
if archived is not None:
|
||
meta["archived"] = bool(archived)
|
||
if not meta.get("created_at"):
|
||
meta["created_at"] = int(time.time())
|
||
path = board_metadata_path(slug)
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(
|
||
json.dumps(meta, indent=2, ensure_ascii=False) + "\n",
|
||
encoding="utf-8",
|
||
)
|
||
meta["db_path"] = str(kanban_db_path(slug))
|
||
return meta
|
||
|
||
|
||
def create_board(
|
||
slug: str,
|
||
*,
|
||
name: Optional[str] = None,
|
||
description: Optional[str] = None,
|
||
icon: Optional[str] = None,
|
||
color: Optional[str] = None,
|
||
) -> dict:
|
||
"""Create a new board directory + DB + metadata. Idempotent.
|
||
|
||
Returns the resulting metadata. Raises :class:`ValueError` for a
|
||
malformed slug; returns the existing metadata (not an error) if the
|
||
board already exists — matching ``mkdir -p`` semantics.
|
||
"""
|
||
normed = _normalize_board_slug(slug)
|
||
if not normed:
|
||
raise ValueError("board slug is required")
|
||
meta = write_board_metadata(
|
||
normed,
|
||
name=name,
|
||
description=description,
|
||
icon=icon,
|
||
color=color,
|
||
)
|
||
# Touch the DB so list_boards() sees it immediately.
|
||
init_db(board=normed)
|
||
return meta
|
||
|
||
|
||
def list_boards(*, include_archived: bool = True) -> list[dict]:
|
||
"""Enumerate all boards that exist on disk.
|
||
|
||
Always includes ``default`` (even when the ``boards/default/``
|
||
metadata dir doesn't exist, because its DB is at the legacy path).
|
||
Other boards are discovered by scanning ``boards/`` for subdirectories
|
||
that either contain a ``kanban.db`` or a ``board.json``.
|
||
|
||
Returns a list of metadata dicts, sorted with ``default`` first and
|
||
the rest alphabetically.
|
||
"""
|
||
entries: list[dict] = []
|
||
seen: set[str] = set()
|
||
|
||
# Default board is always first.
|
||
entries.append(read_board_metadata(DEFAULT_BOARD))
|
||
seen.add(DEFAULT_BOARD)
|
||
|
||
root = boards_root()
|
||
if root.is_dir():
|
||
for child in sorted(root.iterdir(), key=lambda p: p.name.lower()):
|
||
if not child.is_dir():
|
||
continue
|
||
slug = child.name
|
||
# Keep slug normalisation soft for discovery — but skip dirs
|
||
# that don't parse as valid slugs so we don't surface junk.
|
||
try:
|
||
normed = _normalize_board_slug(slug)
|
||
except ValueError:
|
||
continue
|
||
if not normed or normed in seen:
|
||
continue
|
||
has_db = (child / "kanban.db").exists()
|
||
has_meta = (child / "board.json").exists()
|
||
if not (has_db or has_meta):
|
||
continue
|
||
meta = read_board_metadata(normed)
|
||
if meta.get("archived") and not include_archived:
|
||
continue
|
||
entries.append(meta)
|
||
seen.add(normed)
|
||
return entries
|
||
|
||
|
||
def remove_board(slug: str, *, archive: bool = True) -> dict:
|
||
"""Remove or archive a board.
|
||
|
||
``archive=True`` (default) moves the board's directory to
|
||
``<root>/kanban/boards/_archived/<slug>-<timestamp>/`` so the data
|
||
is recoverable. ``archive=False`` deletes the directory outright.
|
||
|
||
The ``default`` board cannot be removed — raises :class:`ValueError`.
|
||
Returns a summary dict describing what happened (``{"slug", "action",
|
||
"new_path"}``).
|
||
"""
|
||
normed = _normalize_board_slug(slug)
|
||
if not normed:
|
||
raise ValueError("board slug is required")
|
||
if normed == DEFAULT_BOARD:
|
||
raise ValueError("the 'default' board cannot be removed")
|
||
d = board_dir(normed)
|
||
if not d.exists():
|
||
raise ValueError(f"board {normed!r} does not exist")
|
||
|
||
# If the user removed the currently-active board, revert to default.
|
||
if get_current_board() == normed:
|
||
clear_current_board()
|
||
|
||
if archive:
|
||
archive_root = boards_root() / "_archived"
|
||
archive_root.mkdir(parents=True, exist_ok=True)
|
||
ts = int(time.time())
|
||
target = archive_root / f"{normed}-{ts}"
|
||
# Avoid collision on rapid double-archives.
|
||
suffix = 1
|
||
while target.exists():
|
||
target = archive_root / f"{normed}-{ts}-{suffix}"
|
||
suffix += 1
|
||
d.rename(target)
|
||
return {"slug": normed, "action": "archived", "new_path": str(target)}
|
||
else:
|
||
import shutil
|
||
shutil.rmtree(d)
|
||
return {"slug": normed, "action": "deleted", "new_path": ""}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Data classes
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class Task:
|
||
"""In-memory view of a row from the ``tasks`` table."""
|
||
|
||
id: str
|
||
title: str
|
||
body: Optional[str]
|
||
assignee: Optional[str]
|
||
status: str
|
||
priority: int
|
||
created_by: Optional[str]
|
||
created_at: int
|
||
started_at: Optional[int]
|
||
completed_at: Optional[int]
|
||
workspace_kind: str
|
||
workspace_path: Optional[str]
|
||
claim_lock: Optional[str]
|
||
claim_expires: Optional[int]
|
||
tenant: Optional[str]
|
||
result: Optional[str] = None
|
||
idempotency_key: Optional[str] = None
|
||
spawn_failures: int = 0
|
||
worker_pid: Optional[int] = None
|
||
last_spawn_error: Optional[str] = None
|
||
max_runtime_seconds: Optional[int] = None
|
||
last_heartbeat_at: Optional[int] = None
|
||
current_run_id: Optional[int] = None
|
||
workflow_template_id: Optional[str] = None
|
||
current_step_key: Optional[str] = None
|
||
# Force-loaded skills for the worker on this task (appended to the
|
||
# dispatcher's built-in `kanban-worker` via --skills). Stored as a
|
||
# JSON array of skill names. None = use only the defaults; empty
|
||
# list = explicitly no extra skills.
|
||
skills: Optional[list] = None
|
||
|
||
@classmethod
|
||
def from_row(cls, row: sqlite3.Row) -> "Task":
|
||
keys = set(row.keys())
|
||
# Parse skills JSON blob if present
|
||
skills_value: Optional[list] = None
|
||
if "skills" in keys and row["skills"]:
|
||
try:
|
||
parsed = json.loads(row["skills"])
|
||
if isinstance(parsed, list):
|
||
skills_value = [str(s) for s in parsed if s]
|
||
except Exception:
|
||
skills_value = None
|
||
return cls(
|
||
id=row["id"],
|
||
title=row["title"],
|
||
body=row["body"],
|
||
assignee=row["assignee"],
|
||
status=row["status"],
|
||
priority=row["priority"],
|
||
created_by=row["created_by"],
|
||
created_at=row["created_at"],
|
||
started_at=row["started_at"],
|
||
completed_at=row["completed_at"],
|
||
workspace_kind=row["workspace_kind"],
|
||
workspace_path=row["workspace_path"],
|
||
claim_lock=row["claim_lock"],
|
||
claim_expires=row["claim_expires"],
|
||
tenant=row["tenant"] if "tenant" in keys else None,
|
||
result=row["result"] if "result" in keys else None,
|
||
idempotency_key=row["idempotency_key"] if "idempotency_key" in keys else None,
|
||
spawn_failures=row["spawn_failures"] if "spawn_failures" in keys else 0,
|
||
worker_pid=row["worker_pid"] if "worker_pid" in keys else None,
|
||
last_spawn_error=row["last_spawn_error"] if "last_spawn_error" in keys else None,
|
||
max_runtime_seconds=(
|
||
row["max_runtime_seconds"] if "max_runtime_seconds" in keys else None
|
||
),
|
||
last_heartbeat_at=(
|
||
row["last_heartbeat_at"] if "last_heartbeat_at" in keys else None
|
||
),
|
||
current_run_id=(
|
||
row["current_run_id"] if "current_run_id" in keys else None
|
||
),
|
||
workflow_template_id=(
|
||
row["workflow_template_id"] if "workflow_template_id" in keys else None
|
||
),
|
||
current_step_key=(
|
||
row["current_step_key"] if "current_step_key" in keys else None
|
||
),
|
||
skills=skills_value,
|
||
)
|
||
|
||
|
||
@dataclass
|
||
class Run:
|
||
"""In-memory view of a ``task_runs`` row.
|
||
|
||
A run is one attempt to execute a task — created on claim, closed
|
||
on complete/block/crash/timeout/spawn_failure/reclaim. Multiple runs
|
||
per task when retries happen. Carries the claim machinery, PID,
|
||
heartbeat, and the structured handoff summary that downstream workers
|
||
read via ``build_worker_context``.
|
||
"""
|
||
|
||
id: int
|
||
task_id: str
|
||
profile: Optional[str]
|
||
step_key: Optional[str]
|
||
status: str
|
||
claim_lock: Optional[str]
|
||
claim_expires: Optional[int]
|
||
worker_pid: Optional[int]
|
||
max_runtime_seconds: Optional[int]
|
||
last_heartbeat_at: Optional[int]
|
||
started_at: int
|
||
ended_at: Optional[int]
|
||
outcome: Optional[str]
|
||
summary: Optional[str]
|
||
metadata: Optional[dict]
|
||
error: Optional[str]
|
||
|
||
@classmethod
|
||
def from_row(cls, row: sqlite3.Row) -> "Run":
|
||
try:
|
||
meta = json.loads(row["metadata"]) if row["metadata"] else None
|
||
except Exception:
|
||
meta = None
|
||
return cls(
|
||
id=int(row["id"]),
|
||
task_id=row["task_id"],
|
||
profile=row["profile"],
|
||
step_key=row["step_key"],
|
||
status=row["status"],
|
||
claim_lock=row["claim_lock"],
|
||
claim_expires=row["claim_expires"],
|
||
worker_pid=row["worker_pid"],
|
||
max_runtime_seconds=row["max_runtime_seconds"],
|
||
last_heartbeat_at=row["last_heartbeat_at"],
|
||
started_at=int(row["started_at"]),
|
||
ended_at=(int(row["ended_at"]) if row["ended_at"] is not None else None),
|
||
outcome=row["outcome"],
|
||
summary=row["summary"],
|
||
metadata=meta,
|
||
error=row["error"],
|
||
)
|
||
|
||
|
||
@dataclass
|
||
class Comment:
|
||
id: int
|
||
task_id: str
|
||
author: str
|
||
body: str
|
||
created_at: int
|
||
|
||
|
||
@dataclass
|
||
class Event:
|
||
id: int
|
||
task_id: str
|
||
kind: str
|
||
payload: Optional[dict]
|
||
created_at: int
|
||
run_id: Optional[int] = None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Schema
|
||
# ---------------------------------------------------------------------------
|
||
|
||
SCHEMA_SQL = """
|
||
CREATE TABLE IF NOT EXISTS tasks (
|
||
id TEXT PRIMARY KEY,
|
||
title TEXT NOT NULL,
|
||
body TEXT,
|
||
assignee TEXT,
|
||
status TEXT NOT NULL,
|
||
priority INTEGER DEFAULT 0,
|
||
created_by TEXT,
|
||
created_at INTEGER NOT NULL,
|
||
started_at INTEGER,
|
||
completed_at INTEGER,
|
||
workspace_kind TEXT NOT NULL DEFAULT 'scratch',
|
||
workspace_path TEXT,
|
||
claim_lock TEXT,
|
||
claim_expires INTEGER,
|
||
tenant TEXT,
|
||
result TEXT,
|
||
idempotency_key TEXT,
|
||
spawn_failures INTEGER NOT NULL DEFAULT 0,
|
||
worker_pid INTEGER,
|
||
last_spawn_error TEXT,
|
||
max_runtime_seconds INTEGER,
|
||
last_heartbeat_at INTEGER,
|
||
-- Pointer into task_runs for the currently-active run (NULL if no
|
||
-- run is in-flight). Denormalised for cheap reads.
|
||
current_run_id INTEGER,
|
||
-- Forward-compat for v2 workflow routing. In v1 the kernel writes
|
||
-- these when the task is opted into a template but otherwise ignores
|
||
-- them; the dispatcher doesn't consult them for routing yet.
|
||
workflow_template_id TEXT,
|
||
current_step_key TEXT,
|
||
-- Force-loaded skills for the worker on this task, stored as JSON.
|
||
-- Appended to the dispatcher's built-in `--skills kanban-worker`.
|
||
-- NULL or empty array = no extras.
|
||
skills TEXT
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS task_links (
|
||
parent_id TEXT NOT NULL,
|
||
child_id TEXT NOT NULL,
|
||
PRIMARY KEY (parent_id, child_id)
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS task_comments (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
task_id TEXT NOT NULL,
|
||
author TEXT NOT NULL,
|
||
body TEXT NOT NULL,
|
||
created_at INTEGER NOT NULL
|
||
);
|
||
|
||
CREATE TABLE IF NOT EXISTS task_events (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
task_id TEXT NOT NULL,
|
||
run_id INTEGER,
|
||
kind TEXT NOT NULL,
|
||
payload TEXT,
|
||
created_at INTEGER NOT NULL
|
||
);
|
||
|
||
-- Historical attempt record. Each time the dispatcher claims a task, a
|
||
-- new row is created here; claim state, PID, heartbeat, runtime cap,
|
||
-- and structured summary all live on the run, not the task. Multiple
|
||
-- rows per task id when the task was retried after crash/timeout/block.
|
||
-- v2 of the kanban schema will use ``step_key`` to drive per-stage
|
||
-- workflow routing; in v1 the column is nullable and unused (kernel
|
||
-- ignores it).
|
||
CREATE TABLE IF NOT EXISTS task_runs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
task_id TEXT NOT NULL,
|
||
profile TEXT,
|
||
step_key TEXT,
|
||
status TEXT NOT NULL,
|
||
-- status: running | done | blocked | crashed | timed_out | failed | released
|
||
claim_lock TEXT,
|
||
claim_expires INTEGER,
|
||
worker_pid INTEGER,
|
||
max_runtime_seconds INTEGER,
|
||
last_heartbeat_at INTEGER,
|
||
started_at INTEGER NOT NULL,
|
||
ended_at INTEGER,
|
||
outcome TEXT,
|
||
-- outcome: completed | blocked | crashed | timed_out | spawn_failed |
|
||
-- gave_up | reclaimed | (null while still running)
|
||
summary TEXT,
|
||
metadata TEXT,
|
||
error TEXT
|
||
);
|
||
|
||
-- Subscription from a gateway source (platform + chat + thread) to a
|
||
-- task. The gateway's kanban-notifier watcher tails task_events and
|
||
-- pushes ``completed`` / ``blocked`` / ``spawn_auto_blocked`` events to
|
||
-- the original requester so human-in-the-loop workflows close the loop.
|
||
CREATE TABLE IF NOT EXISTS kanban_notify_subs (
|
||
task_id TEXT NOT NULL,
|
||
platform TEXT NOT NULL,
|
||
chat_id TEXT NOT NULL,
|
||
thread_id TEXT NOT NULL DEFAULT '',
|
||
user_id TEXT,
|
||
created_at INTEGER NOT NULL,
|
||
last_event_id INTEGER NOT NULL DEFAULT 0,
|
||
PRIMARY KEY (task_id, platform, chat_id, thread_id)
|
||
);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_tasks_assignee_status ON tasks(assignee, status);
|
||
CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status);
|
||
CREATE INDEX IF NOT EXISTS idx_tasks_tenant ON tasks(tenant);
|
||
CREATE INDEX IF NOT EXISTS idx_tasks_idempotency ON tasks(idempotency_key);
|
||
CREATE INDEX IF NOT EXISTS idx_links_child ON task_links(child_id);
|
||
CREATE INDEX IF NOT EXISTS idx_links_parent ON task_links(parent_id);
|
||
CREATE INDEX IF NOT EXISTS idx_comments_task ON task_comments(task_id, created_at);
|
||
CREATE INDEX IF NOT EXISTS idx_events_task ON task_events(task_id, created_at);
|
||
CREATE INDEX IF NOT EXISTS idx_events_run ON task_events(run_id, id);
|
||
CREATE INDEX IF NOT EXISTS idx_runs_task ON task_runs(task_id, started_at);
|
||
CREATE INDEX IF NOT EXISTS idx_runs_status ON task_runs(status);
|
||
CREATE INDEX IF NOT EXISTS idx_notify_task ON kanban_notify_subs(task_id);
|
||
"""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Connection helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_INITIALIZED_PATHS: set[str] = set()
|
||
|
||
|
||
def connect(
|
||
db_path: Optional[Path] = None,
|
||
*,
|
||
board: Optional[str] = None,
|
||
) -> sqlite3.Connection:
|
||
"""Open (and initialize if needed) the kanban DB.
|
||
|
||
WAL mode is enabled on every connection; it's a no-op after the first
|
||
time but keeps the code robust if the DB file is ever re-created.
|
||
|
||
The first connection to a given path auto-runs :func:`init_db` so
|
||
fresh installs and test harnesses that construct `connect()`
|
||
directly don't have to remember a separate init step. Subsequent
|
||
connections skip the schema check via a module-level path cache.
|
||
|
||
Path resolution:
|
||
|
||
* ``db_path`` explicit → used as-is (legacy callers, tests).
|
||
* ``board`` explicit → resolves to that board's DB.
|
||
* Neither → :func:`kanban_db_path` resolves via
|
||
``HERMES_KANBAN_DB`` env → ``HERMES_KANBAN_BOARD`` env →
|
||
``<root>/kanban/current`` → ``default``.
|
||
"""
|
||
if db_path is not None:
|
||
path = db_path
|
||
else:
|
||
path = kanban_db_path(board=board)
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
resolved = str(path.resolve())
|
||
needs_init = resolved not in _INITIALIZED_PATHS
|
||
conn = sqlite3.connect(str(path), isolation_level=None, timeout=30)
|
||
conn.row_factory = sqlite3.Row
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.execute("PRAGMA synchronous=NORMAL")
|
||
conn.execute("PRAGMA foreign_keys=ON")
|
||
if needs_init:
|
||
# Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive
|
||
# migrations. Cached so subsequent connect() calls in the same
|
||
# process are cheap.
|
||
conn.executescript(SCHEMA_SQL)
|
||
_migrate_add_optional_columns(conn)
|
||
_INITIALIZED_PATHS.add(resolved)
|
||
return conn
|
||
|
||
|
||
def init_db(
|
||
db_path: Optional[Path] = None,
|
||
*,
|
||
board: Optional[str] = None,
|
||
) -> Path:
|
||
"""Create the schema if it doesn't exist; return the path used.
|
||
|
||
Kept as a public entry point so CLI ``hermes kanban init`` and the
|
||
daemon have something explicit to call. Unlike :func:`connect`'s
|
||
first-time auto-init (which caches by path), ``init_db`` always
|
||
re-runs the migration pass. Callers that know the on-disk schema
|
||
may have drifted — tests that write legacy event kinds directly,
|
||
external tools that upgrade an old DB file — can call this to
|
||
force re-migration.
|
||
"""
|
||
if db_path is not None:
|
||
path = db_path
|
||
else:
|
||
path = kanban_db_path(board=board)
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
resolved = str(path.resolve())
|
||
# Clear the cache entry so the underlying connect() re-runs the
|
||
# schema + migration pass unconditionally.
|
||
_INITIALIZED_PATHS.discard(resolved)
|
||
with contextlib.closing(connect(path)):
|
||
pass
|
||
return path
|
||
|
||
|
||
def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
|
||
"""Add columns that were introduced after v1 release to legacy DBs.
|
||
|
||
Called by ``init_db`` so opening an old DB is always safe.
|
||
"""
|
||
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
|
||
if "tenant" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN tenant TEXT")
|
||
if "result" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN result TEXT")
|
||
if "idempotency_key" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN idempotency_key TEXT")
|
||
conn.execute(
|
||
"CREATE INDEX IF NOT EXISTS idx_tasks_idempotency "
|
||
"ON tasks(idempotency_key)"
|
||
)
|
||
if "spawn_failures" not in cols:
|
||
conn.execute(
|
||
"ALTER TABLE tasks ADD COLUMN spawn_failures INTEGER NOT NULL DEFAULT 0"
|
||
)
|
||
if "worker_pid" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER")
|
||
if "last_spawn_error" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN last_spawn_error TEXT")
|
||
if "max_runtime_seconds" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER")
|
||
if "last_heartbeat_at" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN last_heartbeat_at INTEGER")
|
||
if "current_run_id" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN current_run_id INTEGER")
|
||
if "workflow_template_id" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN workflow_template_id TEXT")
|
||
if "current_step_key" not in cols:
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN current_step_key TEXT")
|
||
if "skills" not in cols:
|
||
# JSON array of skill names the dispatcher force-loads into the
|
||
# worker (additive to the built-in `kanban-worker`). NULL is fine
|
||
# for existing rows.
|
||
conn.execute("ALTER TABLE tasks ADD COLUMN skills TEXT")
|
||
|
||
# task_events gained a run_id column; back-fill it as NULL for
|
||
# historical events (they predate runs and can't be attributed).
|
||
ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")}
|
||
if "run_id" not in ev_cols:
|
||
conn.execute("ALTER TABLE task_events ADD COLUMN run_id INTEGER")
|
||
conn.execute(
|
||
"CREATE INDEX IF NOT EXISTS idx_events_run "
|
||
"ON task_events(run_id, id)"
|
||
)
|
||
|
||
# One-shot backfill: any task that is 'running' before runs existed
|
||
# had its claim_lock / claim_expires / worker_pid on the task row.
|
||
# Synthesize a matching task_runs row so subsequent end-run / heartbeat
|
||
# calls have something to write to. Wrapped in write_txn to serialize
|
||
# against any concurrent dispatcher, and the per-row UPDATE uses
|
||
# ``current_run_id IS NULL`` as a CAS guard so a racing claim can't
|
||
# produce an orphaned row if it interleaves with the backfill pass.
|
||
runs_exist = conn.execute(
|
||
"SELECT name FROM sqlite_master WHERE type='table' AND name='task_runs'"
|
||
).fetchone() is not None
|
||
if runs_exist:
|
||
with write_txn(conn):
|
||
inflight = conn.execute(
|
||
"SELECT id, assignee, claim_lock, claim_expires, worker_pid, "
|
||
" max_runtime_seconds, last_heartbeat_at, started_at "
|
||
"FROM tasks "
|
||
"WHERE status = 'running' AND current_run_id IS NULL"
|
||
).fetchall()
|
||
for row in inflight:
|
||
started = row["started_at"] or int(time.time())
|
||
cur = conn.execute(
|
||
"""
|
||
INSERT INTO task_runs (
|
||
task_id, profile, status,
|
||
claim_lock, claim_expires, worker_pid,
|
||
max_runtime_seconds, last_heartbeat_at,
|
||
started_at
|
||
) VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
row["id"], row["assignee"], row["claim_lock"],
|
||
row["claim_expires"], row["worker_pid"],
|
||
row["max_runtime_seconds"], row["last_heartbeat_at"],
|
||
started,
|
||
),
|
||
)
|
||
# CAS: only install the pointer if nothing else claimed
|
||
# the task between our SELECT and here (shouldn't happen
|
||
# under the write_txn, but belt-and-suspenders). If the
|
||
# CAS fails we've got an orphan run_row — mark it
|
||
# reclaimed so it doesn't look in-flight.
|
||
upd = conn.execute(
|
||
"UPDATE tasks SET current_run_id = ? "
|
||
"WHERE id = ? AND current_run_id IS NULL",
|
||
(cur.lastrowid, row["id"]),
|
||
)
|
||
if upd.rowcount != 1:
|
||
conn.execute(
|
||
"UPDATE task_runs SET status = 'reclaimed', "
|
||
" outcome = 'reclaimed', ended_at = ? "
|
||
"WHERE id = ?",
|
||
(int(time.time()), cur.lastrowid),
|
||
)
|
||
|
||
# One-shot event-kind rename pass. The old names ("ready", "priority",
|
||
# "spawn_auto_blocked") still worked but were awkward on the wire;
|
||
# rename them in-place so existing DBs migrate cleanly. Fires once
|
||
# per DB because after the UPDATE no rows match the old kinds.
|
||
_EVENT_RENAMES = (
|
||
# (old, new)
|
||
("ready", "promoted"),
|
||
("priority", "reprioritized"),
|
||
("spawn_auto_blocked", "gave_up"),
|
||
)
|
||
for old, new in _EVENT_RENAMES:
|
||
conn.execute(
|
||
"UPDATE task_events SET kind = ? WHERE kind = ?",
|
||
(new, old),
|
||
)
|
||
|
||
|
||
@contextlib.contextmanager
|
||
def write_txn(conn: sqlite3.Connection):
|
||
"""Context manager for an IMMEDIATE write transaction.
|
||
|
||
Use for any multi-statement write (creating a task + link, claiming a
|
||
task + recording an event, etc.). A claim CAS inside this context is
|
||
atomic -- at most one concurrent writer can succeed.
|
||
"""
|
||
conn.execute("BEGIN IMMEDIATE")
|
||
try:
|
||
yield conn
|
||
except Exception:
|
||
conn.execute("ROLLBACK")
|
||
raise
|
||
else:
|
||
conn.execute("COMMIT")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# ID generation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _new_task_id() -> str:
|
||
"""Generate a short, URL-safe task id.
|
||
|
||
4 hex bytes = ~4.3B possibilities. At 10k tasks the collision
|
||
probability is ~1.2e-5; at 100k it's ~1.2e-3. Previously we used 2
|
||
hex bytes (65k possibilities) which hit the birthday paradox hard:
|
||
~5% collision probability at 1k tasks, ~50% at 10k. Callers that
|
||
care about idempotency should pass ``idempotency_key`` to
|
||
:func:`create_task` rather than rely on id uniqueness.
|
||
"""
|
||
return "t_" + secrets.token_hex(4)
|
||
|
||
|
||
def _claimer_id() -> str:
|
||
"""Return a ``host:pid`` string that identifies this claimer."""
|
||
import socket
|
||
try:
|
||
host = socket.gethostname() or "unknown"
|
||
except Exception:
|
||
host = "unknown"
|
||
return f"{host}:{os.getpid()}"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Task creation / mutation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _canonical_assignee(assignee: Optional[str]) -> Optional[str]:
|
||
"""Lowercase-assignee normalization for Kanban rows (dashboard/CLI parity)."""
|
||
if assignee is None:
|
||
return None
|
||
from hermes_cli.profiles import normalize_profile_name
|
||
|
||
return normalize_profile_name(assignee)
|
||
|
||
|
||
def create_task(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
title: str,
|
||
body: Optional[str] = None,
|
||
assignee: Optional[str] = None,
|
||
created_by: Optional[str] = None,
|
||
workspace_kind: str = "scratch",
|
||
workspace_path: Optional[str] = None,
|
||
tenant: Optional[str] = None,
|
||
priority: int = 0,
|
||
parents: Iterable[str] = (),
|
||
triage: bool = False,
|
||
idempotency_key: Optional[str] = None,
|
||
max_runtime_seconds: Optional[int] = None,
|
||
skills: Optional[Iterable[str]] = None,
|
||
) -> str:
|
||
"""Create a new task and optionally link it under parent tasks.
|
||
|
||
Returns the new task id. Status is ``ready`` when there are no
|
||
parents (or all parents already ``done``), otherwise ``todo``.
|
||
If ``triage=True``, status is forced to ``triage`` regardless of
|
||
parents — a specifier/triager is expected to promote the task to
|
||
``todo`` once the spec is fleshed out.
|
||
|
||
If ``idempotency_key`` is provided and a non-archived task with the
|
||
same key already exists, returns the existing task's id instead of
|
||
creating a duplicate. Useful for retried webhooks / automation that
|
||
should not double-write.
|
||
|
||
``max_runtime_seconds`` caps how long a worker may run before the
|
||
dispatcher SIGTERMs (then SIGKILLs after a grace window) and
|
||
re-queues the task. ``None`` means no cap (default).
|
||
|
||
``skills`` is an optional list of skill names to force-load into
|
||
the worker when dispatched. Stored as JSON; the dispatcher passes
|
||
each name to ``hermes --skills ...`` alongside the built-in
|
||
``kanban-worker``. Use this to pin a task to a specialist skill
|
||
(e.g. ``skills=["translation"]`` so the worker loads the
|
||
translation skill regardless of the profile's default config).
|
||
"""
|
||
assignee = _canonical_assignee(assignee)
|
||
if not title or not title.strip():
|
||
raise ValueError("title is required")
|
||
if workspace_kind not in VALID_WORKSPACE_KINDS:
|
||
raise ValueError(
|
||
f"workspace_kind must be one of {sorted(VALID_WORKSPACE_KINDS)}, "
|
||
f"got {workspace_kind!r}"
|
||
)
|
||
parents = tuple(p for p in parents if p)
|
||
|
||
# Normalise + validate skills: strip whitespace, drop empties, dedupe
|
||
# (preserving order). Refuse commas inside a single name so we don't
|
||
# invisibly splatter a comma-joined string into one argv slot — the
|
||
# `hermes --skills X,Y` comma syntax is handled in the dispatcher,
|
||
# not here.
|
||
skills_list: Optional[list[str]] = None
|
||
if skills is not None:
|
||
cleaned: list[str] = []
|
||
seen: set[str] = set()
|
||
for s in skills:
|
||
if not s:
|
||
continue
|
||
name = str(s).strip()
|
||
if not name:
|
||
continue
|
||
if "," in name:
|
||
raise ValueError(
|
||
f"skill name cannot contain comma: {name!r} "
|
||
f"(pass a list of separate names instead of a comma-joined string)"
|
||
)
|
||
if name in seen:
|
||
continue
|
||
seen.add(name)
|
||
cleaned.append(name)
|
||
skills_list = cleaned
|
||
|
||
# Idempotency check — return the existing task instead of creating a
|
||
# duplicate. Done BEFORE entering write_txn to keep the fast path fast
|
||
# and to avoid holding a write lock during the lookup. Race is
|
||
# acceptable: two concurrent creators with the same key might both
|
||
# insert, at which point both rows exist but the next lookup stabilises.
|
||
if idempotency_key:
|
||
row = conn.execute(
|
||
"SELECT id FROM tasks WHERE idempotency_key = ? "
|
||
"AND status != 'archived' "
|
||
"ORDER BY created_at DESC LIMIT 1",
|
||
(idempotency_key,),
|
||
).fetchone()
|
||
if row:
|
||
return row["id"]
|
||
|
||
now = int(time.time())
|
||
|
||
# Retry once on the extremely unlikely id collision.
|
||
for attempt in range(2):
|
||
task_id = _new_task_id()
|
||
try:
|
||
with write_txn(conn):
|
||
# Determine initial status from parent status, unless the
|
||
# caller is parking this task in triage for a specifier.
|
||
if triage:
|
||
initial_status = "triage"
|
||
else:
|
||
initial_status = "ready"
|
||
if parents:
|
||
missing = _find_missing_parents(conn, parents)
|
||
if missing:
|
||
raise ValueError(f"unknown parent task(s): {', '.join(missing)}")
|
||
# If any parent is not yet done, we're todo.
|
||
rows = conn.execute(
|
||
"SELECT status FROM tasks WHERE id IN "
|
||
"(" + ",".join("?" * len(parents)) + ")",
|
||
parents,
|
||
).fetchall()
|
||
if any(r["status"] != "done" for r in rows):
|
||
initial_status = "todo"
|
||
# Even in triage mode we still need to validate parent ids
|
||
# so the eventual link rows don't dangle.
|
||
if triage and parents:
|
||
missing = _find_missing_parents(conn, parents)
|
||
if missing:
|
||
raise ValueError(f"unknown parent task(s): {', '.join(missing)}")
|
||
|
||
conn.execute(
|
||
"""
|
||
INSERT INTO tasks (
|
||
id, title, body, assignee, status, priority,
|
||
created_by, created_at, workspace_kind, workspace_path,
|
||
tenant, idempotency_key, max_runtime_seconds, skills
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
task_id,
|
||
title.strip(),
|
||
body,
|
||
assignee,
|
||
initial_status,
|
||
priority,
|
||
created_by,
|
||
now,
|
||
workspace_kind,
|
||
workspace_path,
|
||
tenant,
|
||
idempotency_key,
|
||
int(max_runtime_seconds) if max_runtime_seconds else None,
|
||
json.dumps(skills_list) if skills_list is not None else None,
|
||
),
|
||
)
|
||
for pid in parents:
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO task_links (parent_id, child_id) VALUES (?, ?)",
|
||
(pid, task_id),
|
||
)
|
||
_append_event(
|
||
conn,
|
||
task_id,
|
||
"created",
|
||
{
|
||
"assignee": assignee,
|
||
"status": initial_status,
|
||
"parents": list(parents),
|
||
"tenant": tenant,
|
||
"skills": list(skills_list) if skills_list else None,
|
||
},
|
||
)
|
||
return task_id
|
||
except sqlite3.IntegrityError:
|
||
if attempt == 1:
|
||
raise
|
||
# Retry with a fresh id.
|
||
continue
|
||
raise RuntimeError("unreachable")
|
||
|
||
|
||
def _find_missing_parents(conn: sqlite3.Connection, parents: Iterable[str]) -> list[str]:
|
||
parents = list(parents)
|
||
if not parents:
|
||
return []
|
||
placeholders = ",".join("?" * len(parents))
|
||
rows = conn.execute(
|
||
f"SELECT id FROM tasks WHERE id IN ({placeholders})",
|
||
parents,
|
||
).fetchall()
|
||
present = {r["id"] for r in rows}
|
||
return [p for p in parents if p not in present]
|
||
|
||
|
||
def get_task(conn: sqlite3.Connection, task_id: str) -> Optional[Task]:
|
||
row = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
|
||
return Task.from_row(row) if row else None
|
||
|
||
|
||
def list_tasks(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
assignee: Optional[str] = None,
|
||
status: Optional[str] = None,
|
||
tenant: Optional[str] = None,
|
||
include_archived: bool = False,
|
||
limit: Optional[int] = None,
|
||
) -> list[Task]:
|
||
query = "SELECT * FROM tasks WHERE 1=1"
|
||
params: list[Any] = []
|
||
if assignee is not None:
|
||
query += " AND assignee = ?"
|
||
params.append(_canonical_assignee(assignee))
|
||
if status is not None:
|
||
if status not in VALID_STATUSES:
|
||
raise ValueError(f"status must be one of {sorted(VALID_STATUSES)}")
|
||
query += " AND status = ?"
|
||
params.append(status)
|
||
if tenant is not None:
|
||
query += " AND tenant = ?"
|
||
params.append(tenant)
|
||
if not include_archived and status != "archived":
|
||
query += " AND status != 'archived'"
|
||
query += " ORDER BY priority DESC, created_at ASC"
|
||
if limit:
|
||
query += f" LIMIT {int(limit)}"
|
||
rows = conn.execute(query, params).fetchall()
|
||
return [Task.from_row(r) for r in rows]
|
||
|
||
|
||
def assign_task(conn: sqlite3.Connection, task_id: str, profile: Optional[str]) -> bool:
|
||
"""Assign or reassign a task. Returns True on success.
|
||
|
||
Refuses to reassign a task that's currently running (claim_lock set).
|
||
Reassign after the current run completes if needed.
|
||
"""
|
||
profile = _canonical_assignee(profile)
|
||
with write_txn(conn):
|
||
row = conn.execute(
|
||
"SELECT status, claim_lock FROM tasks WHERE id = ?", (task_id,)
|
||
).fetchone()
|
||
if not row:
|
||
return False
|
||
if row["claim_lock"] is not None and row["status"] == "running":
|
||
raise RuntimeError(
|
||
f"cannot reassign {task_id}: currently running (claimed). "
|
||
"Wait for completion or reclaim the stale lock first."
|
||
)
|
||
conn.execute("UPDATE tasks SET assignee = ? WHERE id = ?", (profile, task_id))
|
||
_append_event(conn, task_id, "assigned", {"assignee": profile})
|
||
return True
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Links
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def link_tasks(conn: sqlite3.Connection, parent_id: str, child_id: str) -> None:
|
||
if parent_id == child_id:
|
||
raise ValueError("a task cannot depend on itself")
|
||
with write_txn(conn):
|
||
missing = _find_missing_parents(conn, [parent_id, child_id])
|
||
if missing:
|
||
raise ValueError(f"unknown task(s): {', '.join(missing)}")
|
||
if _would_cycle(conn, parent_id, child_id):
|
||
raise ValueError(
|
||
f"linking {parent_id} -> {child_id} would create a cycle"
|
||
)
|
||
conn.execute(
|
||
"INSERT OR IGNORE INTO task_links (parent_id, child_id) VALUES (?, ?)",
|
||
(parent_id, child_id),
|
||
)
|
||
# If child was ready but parent is not yet done, demote child to todo.
|
||
parent_status = conn.execute(
|
||
"SELECT status FROM tasks WHERE id = ?", (parent_id,)
|
||
).fetchone()["status"]
|
||
if parent_status != "done":
|
||
conn.execute(
|
||
"UPDATE tasks SET status = 'todo' WHERE id = ? AND status = 'ready'",
|
||
(child_id,),
|
||
)
|
||
_append_event(
|
||
conn, child_id, "linked",
|
||
{"parent": parent_id, "child": child_id},
|
||
)
|
||
|
||
|
||
def _would_cycle(conn: sqlite3.Connection, parent_id: str, child_id: str) -> bool:
|
||
"""Return True if adding parent->child creates a cycle.
|
||
|
||
A cycle exists iff ``parent_id`` is already a descendant of
|
||
``child_id`` via existing parent->child links. We walk downward
|
||
from ``child_id`` and check whether we reach ``parent_id``.
|
||
"""
|
||
seen = set()
|
||
stack = [child_id]
|
||
while stack:
|
||
node = stack.pop()
|
||
if node == parent_id:
|
||
return True
|
||
if node in seen:
|
||
continue
|
||
seen.add(node)
|
||
rows = conn.execute(
|
||
"SELECT child_id FROM task_links WHERE parent_id = ?", (node,)
|
||
).fetchall()
|
||
stack.extend(r["child_id"] for r in rows)
|
||
return False
|
||
|
||
|
||
def unlink_tasks(conn: sqlite3.Connection, parent_id: str, child_id: str) -> bool:
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"DELETE FROM task_links WHERE parent_id = ? AND child_id = ?",
|
||
(parent_id, child_id),
|
||
)
|
||
if cur.rowcount:
|
||
_append_event(
|
||
conn, child_id, "unlinked",
|
||
{"parent": parent_id, "child": child_id},
|
||
)
|
||
return cur.rowcount > 0
|
||
|
||
|
||
def parent_ids(conn: sqlite3.Connection, task_id: str) -> list[str]:
|
||
rows = conn.execute(
|
||
"SELECT parent_id FROM task_links WHERE child_id = ? ORDER BY parent_id",
|
||
(task_id,),
|
||
).fetchall()
|
||
return [r["parent_id"] for r in rows]
|
||
|
||
|
||
def child_ids(conn: sqlite3.Connection, task_id: str) -> list[str]:
|
||
rows = conn.execute(
|
||
"SELECT child_id FROM task_links WHERE parent_id = ? ORDER BY child_id",
|
||
(task_id,),
|
||
).fetchall()
|
||
return [r["child_id"] for r in rows]
|
||
|
||
|
||
def parent_results(conn: sqlite3.Connection, task_id: str) -> list[tuple[str, Optional[str]]]:
|
||
"""Return ``(parent_id, result)`` for every done parent of ``task_id``."""
|
||
rows = conn.execute(
|
||
"""
|
||
SELECT t.id AS id, t.result AS result
|
||
FROM tasks t
|
||
JOIN task_links l ON l.parent_id = t.id
|
||
WHERE l.child_id = ? AND t.status = 'done'
|
||
ORDER BY t.completed_at ASC
|
||
""",
|
||
(task_id,),
|
||
).fetchall()
|
||
return [(r["id"], r["result"]) for r in rows]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Comments & events
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def add_comment(
|
||
conn: sqlite3.Connection, task_id: str, author: str, body: str
|
||
) -> int:
|
||
if not body or not body.strip():
|
||
raise ValueError("comment body is required")
|
||
if not author or not author.strip():
|
||
raise ValueError("comment author is required")
|
||
now = int(time.time())
|
||
with write_txn(conn):
|
||
if not conn.execute(
|
||
"SELECT 1 FROM tasks WHERE id = ?", (task_id,)
|
||
).fetchone():
|
||
raise ValueError(f"unknown task {task_id}")
|
||
cur = conn.execute(
|
||
"INSERT INTO task_comments (task_id, author, body, created_at) "
|
||
"VALUES (?, ?, ?, ?)",
|
||
(task_id, author.strip(), body.strip(), now),
|
||
)
|
||
_append_event(conn, task_id, "commented", {"author": author, "len": len(body)})
|
||
return int(cur.lastrowid or 0)
|
||
|
||
|
||
def list_comments(conn: sqlite3.Connection, task_id: str) -> list[Comment]:
|
||
rows = conn.execute(
|
||
"SELECT * FROM task_comments WHERE task_id = ? ORDER BY created_at ASC",
|
||
(task_id,),
|
||
).fetchall()
|
||
return [
|
||
Comment(
|
||
id=r["id"],
|
||
task_id=r["task_id"],
|
||
author=r["author"],
|
||
body=r["body"],
|
||
created_at=r["created_at"],
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
|
||
def list_events(conn: sqlite3.Connection, task_id: str) -> list[Event]:
|
||
rows = conn.execute(
|
||
"SELECT * FROM task_events WHERE task_id = ? ORDER BY created_at ASC, id ASC",
|
||
(task_id,),
|
||
).fetchall()
|
||
out = []
|
||
for r in rows:
|
||
try:
|
||
payload = json.loads(r["payload"]) if r["payload"] else None
|
||
except Exception:
|
||
payload = None
|
||
out.append(
|
||
Event(
|
||
id=r["id"],
|
||
task_id=r["task_id"],
|
||
kind=r["kind"],
|
||
payload=payload,
|
||
created_at=r["created_at"],
|
||
run_id=(int(r["run_id"]) if "run_id" in r.keys() and r["run_id"] is not None else None),
|
||
)
|
||
)
|
||
return out
|
||
|
||
|
||
def _append_event(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
kind: str,
|
||
payload: Optional[dict] = None,
|
||
*,
|
||
run_id: Optional[int] = None,
|
||
) -> None:
|
||
"""Record an event row. Called from within an already-open txn.
|
||
|
||
``run_id`` is optional: pass the current run id so UIs can group
|
||
events by attempt. For events that aren't scoped to a single run
|
||
(task created/edited/archived, dependency promotion) leave it None
|
||
and the row carries NULL.
|
||
"""
|
||
now = int(time.time())
|
||
pl = json.dumps(payload, ensure_ascii=False) if payload else None
|
||
conn.execute(
|
||
"INSERT INTO task_events (task_id, run_id, kind, payload, created_at) "
|
||
"VALUES (?, ?, ?, ?, ?)",
|
||
(task_id, run_id, kind, pl, now),
|
||
)
|
||
|
||
|
||
def _end_run(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
outcome: str,
|
||
summary: Optional[str] = None,
|
||
error: Optional[str] = None,
|
||
metadata: Optional[dict] = None,
|
||
status: Optional[str] = None,
|
||
) -> Optional[int]:
|
||
"""Close the currently-active run for ``task_id`` and clear the pointer.
|
||
|
||
``outcome`` is the semantic result (completed / blocked / crashed /
|
||
timed_out / spawn_failed / gave_up / reclaimed). ``status`` is the
|
||
run-row status (usually just ``outcome``, but callers can pass it
|
||
explicitly). Returns the closed run_id or ``None`` if no active run
|
||
existed (e.g. a CLI user calling ``hermes kanban complete`` on a
|
||
task that was never claimed).
|
||
"""
|
||
now = int(time.time())
|
||
row = conn.execute(
|
||
"SELECT current_run_id FROM tasks WHERE id = ?", (task_id,),
|
||
).fetchone()
|
||
if not row or not row["current_run_id"]:
|
||
return None
|
||
run_id = int(row["current_run_id"])
|
||
conn.execute(
|
||
"""
|
||
UPDATE task_runs
|
||
SET status = ?,
|
||
outcome = ?,
|
||
summary = ?,
|
||
error = ?,
|
||
metadata = ?,
|
||
ended_at = ?,
|
||
claim_lock = NULL,
|
||
claim_expires = NULL,
|
||
worker_pid = NULL
|
||
WHERE id = ?
|
||
AND ended_at IS NULL
|
||
""",
|
||
(
|
||
status or outcome,
|
||
outcome,
|
||
summary,
|
||
error,
|
||
json.dumps(metadata, ensure_ascii=False) if metadata else None,
|
||
now,
|
||
run_id,
|
||
),
|
||
)
|
||
conn.execute(
|
||
"UPDATE tasks SET current_run_id = NULL WHERE id = ?", (task_id,),
|
||
)
|
||
return run_id
|
||
|
||
|
||
def _current_run_id(conn: sqlite3.Connection, task_id: str) -> Optional[int]:
|
||
row = conn.execute(
|
||
"SELECT current_run_id FROM tasks WHERE id = ?", (task_id,),
|
||
).fetchone()
|
||
return int(row["current_run_id"]) if row and row["current_run_id"] else None
|
||
|
||
|
||
def _synthesize_ended_run(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
outcome: str,
|
||
summary: Optional[str] = None,
|
||
error: Optional[str] = None,
|
||
metadata: Optional[dict] = None,
|
||
) -> int:
|
||
"""Insert a zero-duration, already-closed run row.
|
||
|
||
Used when a terminal transition happens on a task that was never
|
||
claimed (CLI user calling ``hermes kanban complete <ready-task>
|
||
--summary X``, or dashboard "mark done" on a ready task). Without
|
||
this, the handoff fields (summary / metadata / error) would be
|
||
silently dropped: ``_end_run`` is a no-op because there's no
|
||
current run.
|
||
|
||
The synthetic run has ``started_at == ended_at == now`` so it
|
||
shows up in attempt history as "instant" and doesn't skew elapsed
|
||
stats. Caller is responsible for leaving ``current_run_id`` NULL
|
||
(or for clearing it elsewhere in the same txn) since this
|
||
function does NOT touch the tasks row.
|
||
"""
|
||
now = int(time.time())
|
||
trow = conn.execute(
|
||
"SELECT assignee, current_step_key FROM tasks WHERE id = ?",
|
||
(task_id,),
|
||
).fetchone()
|
||
profile = trow["assignee"] if trow else None
|
||
step_key = trow["current_step_key"] if trow else None
|
||
cur = conn.execute(
|
||
"""
|
||
INSERT INTO task_runs (
|
||
task_id, profile, step_key,
|
||
status, outcome,
|
||
summary, error, metadata,
|
||
started_at, ended_at
|
||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
task_id, profile, step_key,
|
||
outcome, outcome,
|
||
summary, error,
|
||
json.dumps(metadata, ensure_ascii=False) if metadata else None,
|
||
now, now,
|
||
),
|
||
)
|
||
return int(cur.lastrowid or 0)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dependency resolution (todo -> ready)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def recompute_ready(conn: sqlite3.Connection) -> int:
|
||
"""Promote ``todo`` tasks to ``ready`` when all parents are ``done``.
|
||
|
||
Returns the number of tasks promoted. Safe to call inside or outside
|
||
an existing transaction; it opens its own IMMEDIATE txn.
|
||
"""
|
||
promoted = 0
|
||
with write_txn(conn):
|
||
todo_rows = conn.execute(
|
||
"SELECT id FROM tasks WHERE status = 'todo'"
|
||
).fetchall()
|
||
for row in todo_rows:
|
||
task_id = row["id"]
|
||
parents = conn.execute(
|
||
"SELECT t.status FROM tasks t "
|
||
"JOIN task_links l ON l.parent_id = t.id "
|
||
"WHERE l.child_id = ?",
|
||
(task_id,),
|
||
).fetchall()
|
||
if all(p["status"] == "done" for p in parents):
|
||
conn.execute(
|
||
"UPDATE tasks SET status = 'ready' WHERE id = ? AND status = 'todo'",
|
||
(task_id,),
|
||
)
|
||
_append_event(conn, task_id, "promoted", None)
|
||
promoted += 1
|
||
return promoted
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Claim / complete / block
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def claim_task(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS,
|
||
claimer: Optional[str] = None,
|
||
) -> Optional[Task]:
|
||
"""Atomically transition ``ready -> running``.
|
||
|
||
Returns the claimed ``Task`` on success, ``None`` if the task was
|
||
already claimed (or is not in ``ready`` status).
|
||
"""
|
||
now = int(time.time())
|
||
lock = claimer or _claimer_id()
|
||
expires = now + int(ttl_seconds)
|
||
with write_txn(conn):
|
||
# Defensive: if a prior run somehow leaked (invariant violation from
|
||
# an unknown code path), close it as 'reclaimed' so we don't strand
|
||
# it when the CAS resets the pointer below. No-op when the invariant
|
||
# holds (the common case).
|
||
stale = conn.execute(
|
||
"SELECT current_run_id FROM tasks WHERE id = ? AND status = 'ready'",
|
||
(task_id,),
|
||
).fetchone()
|
||
if stale and stale["current_run_id"]:
|
||
conn.execute(
|
||
"""
|
||
UPDATE task_runs
|
||
SET status = 'reclaimed', outcome = 'reclaimed',
|
||
summary = COALESCE(summary, 'invariant recovery on re-claim'),
|
||
ended_at = ?,
|
||
claim_lock = NULL, claim_expires = NULL, worker_pid = NULL
|
||
WHERE id = ? AND ended_at IS NULL
|
||
""",
|
||
(now, int(stale["current_run_id"])),
|
||
)
|
||
cur = conn.execute(
|
||
"""
|
||
UPDATE tasks
|
||
SET status = 'running',
|
||
claim_lock = ?,
|
||
claim_expires = ?,
|
||
started_at = COALESCE(started_at, ?)
|
||
WHERE id = ?
|
||
AND status = 'ready'
|
||
AND claim_lock IS NULL
|
||
""",
|
||
(lock, expires, now, task_id),
|
||
)
|
||
if cur.rowcount != 1:
|
||
return None
|
||
# Look up the current task row so we can populate the run with
|
||
# its assignee / step / runtime cap.
|
||
trow = conn.execute(
|
||
"SELECT assignee, max_runtime_seconds, current_step_key "
|
||
"FROM tasks WHERE id = ?",
|
||
(task_id,),
|
||
).fetchone()
|
||
run_cur = conn.execute(
|
||
"""
|
||
INSERT INTO task_runs (
|
||
task_id, profile, step_key, status,
|
||
claim_lock, claim_expires, max_runtime_seconds,
|
||
started_at
|
||
) VALUES (?, ?, ?, 'running', ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
task_id,
|
||
trow["assignee"] if trow else None,
|
||
trow["current_step_key"] if trow else None,
|
||
lock,
|
||
expires,
|
||
trow["max_runtime_seconds"] if trow else None,
|
||
now,
|
||
),
|
||
)
|
||
run_id = run_cur.lastrowid
|
||
conn.execute(
|
||
"UPDATE tasks SET current_run_id = ? WHERE id = ?",
|
||
(run_id, task_id),
|
||
)
|
||
_append_event(
|
||
conn, task_id, "claimed",
|
||
{"lock": lock, "expires": expires, "run_id": run_id},
|
||
run_id=run_id,
|
||
)
|
||
return get_task(conn, task_id)
|
||
|
||
|
||
def heartbeat_claim(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS,
|
||
claimer: Optional[str] = None,
|
||
) -> bool:
|
||
"""Extend a running claim. Returns True if we still own it.
|
||
|
||
Workers that know they'll exceed 15 minutes should call this every
|
||
few minutes to keep ownership.
|
||
"""
|
||
expires = int(time.time()) + int(ttl_seconds)
|
||
lock = claimer or _claimer_id()
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET claim_expires = ? "
|
||
"WHERE id = ? AND status = 'running' AND claim_lock = ?",
|
||
(expires, task_id, lock),
|
||
)
|
||
if cur.rowcount == 1:
|
||
run_id = _current_run_id(conn, task_id)
|
||
if run_id is not None:
|
||
conn.execute(
|
||
"UPDATE task_runs SET claim_expires = ? WHERE id = ?",
|
||
(expires, run_id),
|
||
)
|
||
return True
|
||
return False
|
||
|
||
|
||
def release_stale_claims(conn: sqlite3.Connection) -> int:
|
||
"""Reset any ``running`` task whose claim has expired.
|
||
|
||
Returns the number of stale claims reclaimed. Safe to call often.
|
||
"""
|
||
now = int(time.time())
|
||
reclaimed = 0
|
||
with write_txn(conn):
|
||
stale = conn.execute(
|
||
"SELECT id, claim_lock FROM tasks "
|
||
"WHERE status = 'running' AND claim_expires IS NOT NULL AND claim_expires < ?",
|
||
(now,),
|
||
).fetchall()
|
||
for row in stale:
|
||
conn.execute(
|
||
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
|
||
"claim_expires = NULL, worker_pid = NULL "
|
||
"WHERE id = ? AND status = 'running'",
|
||
(row["id"],),
|
||
)
|
||
run_id = _end_run(
|
||
conn, row["id"],
|
||
outcome="reclaimed", status="reclaimed",
|
||
error=f"stale_lock={row['claim_lock']}",
|
||
)
|
||
_append_event(
|
||
conn, row["id"], "reclaimed",
|
||
{"stale_lock": row["claim_lock"]},
|
||
run_id=run_id,
|
||
)
|
||
reclaimed += 1
|
||
return reclaimed
|
||
|
||
|
||
def complete_task(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
result: Optional[str] = None,
|
||
summary: Optional[str] = None,
|
||
metadata: Optional[dict] = None,
|
||
) -> bool:
|
||
"""Transition ``running|ready -> done`` and record ``result``.
|
||
|
||
Accepts a task that's merely ``ready`` too, so a manual CLI
|
||
completion (``hermes kanban complete <id>``) works without requiring
|
||
a claim/start/complete sequence.
|
||
|
||
``summary`` and ``metadata`` are stored on the closing run (if any)
|
||
and surfaced to downstream children via :func:`build_worker_context`.
|
||
When ``summary`` is omitted we fall back to ``result`` so single-run
|
||
callers don't have to pass both. ``metadata`` is a free-form dict
|
||
(e.g. ``{"changed_files": [...], "tests_run": [...]}``) — workers
|
||
are encouraged to use it for structured handoff facts.
|
||
"""
|
||
now = int(time.time())
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"""
|
||
UPDATE tasks
|
||
SET status = 'done',
|
||
result = ?,
|
||
completed_at = ?,
|
||
claim_lock = NULL,
|
||
claim_expires= NULL,
|
||
worker_pid = NULL
|
||
WHERE id = ?
|
||
AND status IN ('running', 'ready', 'blocked')
|
||
""",
|
||
(result, now, task_id),
|
||
)
|
||
if cur.rowcount != 1:
|
||
return False
|
||
run_id = _end_run(
|
||
conn, task_id,
|
||
outcome="completed", status="done",
|
||
summary=summary if summary is not None else result,
|
||
metadata=metadata,
|
||
)
|
||
# If complete_task was called on a never-claimed task (ready or
|
||
# blocked → done with no run in flight), synthesize a
|
||
# zero-duration run so the handoff fields are persisted in
|
||
# attempt history instead of silently lost.
|
||
if run_id is None and (summary or metadata or result):
|
||
run_id = _synthesize_ended_run(
|
||
conn, task_id,
|
||
outcome="completed",
|
||
summary=summary if summary is not None else result,
|
||
metadata=metadata,
|
||
)
|
||
# Carry the handoff summary in the event payload so gateway
|
||
# notifiers and dashboard WS consumers can render it without a
|
||
# second SQL round-trip. First line only, 400 char cap — the
|
||
# full summary stays on the run row.
|
||
ev_summary = (summary if summary is not None else result) or ""
|
||
ev_summary = ev_summary.strip().splitlines()[0][:400] if ev_summary else ""
|
||
_append_event(
|
||
conn, task_id, "completed",
|
||
{
|
||
"result_len": len(result) if result else 0,
|
||
"summary": ev_summary or None,
|
||
},
|
||
run_id=run_id,
|
||
)
|
||
# Recompute ready status for dependents (separate txn so children see done).
|
||
recompute_ready(conn)
|
||
return True
|
||
|
||
|
||
def block_task(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
reason: Optional[str] = None,
|
||
) -> bool:
|
||
"""Transition ``running -> blocked``."""
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"""
|
||
UPDATE tasks
|
||
SET status = 'blocked',
|
||
claim_lock = NULL,
|
||
claim_expires= NULL,
|
||
worker_pid = NULL
|
||
WHERE id = ?
|
||
AND status IN ('running', 'ready')
|
||
""",
|
||
(task_id,),
|
||
)
|
||
if cur.rowcount != 1:
|
||
return False
|
||
run_id = _end_run(
|
||
conn, task_id,
|
||
outcome="blocked", status="blocked",
|
||
summary=reason,
|
||
)
|
||
# Synthesize a run when blocking a never-claimed task so the
|
||
# reason is preserved in attempt history.
|
||
if run_id is None and reason:
|
||
run_id = _synthesize_ended_run(
|
||
conn, task_id,
|
||
outcome="blocked",
|
||
summary=reason,
|
||
)
|
||
_append_event(conn, task_id, "blocked", {"reason": reason}, run_id=run_id)
|
||
return True
|
||
|
||
|
||
def unblock_task(conn: sqlite3.Connection, task_id: str) -> bool:
|
||
"""Transition ``blocked -> ready``.
|
||
|
||
Defensively closes any stale ``current_run_id`` pointer before flipping
|
||
status. In the common path (``block_task`` closed the run already) this
|
||
is a no-op. If a future or external write left the pointer dangling,
|
||
the leaked run is closed as ``reclaimed`` inside the same txn so the
|
||
runs invariant (``current_run_id IS NULL`` ⇔ run row in terminal
|
||
state) holds for the rest of this function's lifetime.
|
||
"""
|
||
now = int(time.time())
|
||
with write_txn(conn):
|
||
stale = conn.execute(
|
||
"SELECT current_run_id FROM tasks WHERE id = ? AND status = 'blocked'",
|
||
(task_id,),
|
||
).fetchone()
|
||
if stale and stale["current_run_id"]:
|
||
conn.execute(
|
||
"""
|
||
UPDATE task_runs
|
||
SET status = 'reclaimed', outcome = 'reclaimed',
|
||
summary = COALESCE(summary, 'invariant recovery on unblock'),
|
||
ended_at = ?,
|
||
claim_lock = NULL, claim_expires = NULL, worker_pid = NULL
|
||
WHERE id = ? AND ended_at IS NULL
|
||
""",
|
||
(now, int(stale["current_run_id"])),
|
||
)
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET status = 'ready', current_run_id = NULL "
|
||
"WHERE id = ? AND status = 'blocked'",
|
||
(task_id,),
|
||
)
|
||
if cur.rowcount != 1:
|
||
return False
|
||
_append_event(conn, task_id, "unblocked", None)
|
||
return True
|
||
|
||
|
||
def archive_task(conn: sqlite3.Connection, task_id: str) -> bool:
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET status = 'archived', "
|
||
" claim_lock = NULL, claim_expires = NULL, worker_pid = NULL "
|
||
"WHERE id = ? AND status != 'archived'",
|
||
(task_id,),
|
||
)
|
||
if cur.rowcount != 1:
|
||
return False
|
||
# If archive happened while a run was still in flight (e.g. user
|
||
# archived a running task from the dashboard), close that run with
|
||
# outcome='reclaimed' so attempt history isn't orphaned.
|
||
run_id = _end_run(
|
||
conn, task_id,
|
||
outcome="reclaimed", status="reclaimed",
|
||
summary="task archived with run still active",
|
||
)
|
||
_append_event(conn, task_id, "archived", None, run_id=run_id)
|
||
return True
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Workspace resolution
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def resolve_workspace(task: Task, *, board: Optional[str] = None) -> Path:
|
||
"""Resolve (and create if needed) the workspace for a task.
|
||
|
||
- ``scratch``: a fresh dir under ``<board-root>/workspaces/<id>/``,
|
||
where ``<board-root>`` is the active board's root. The path is the
|
||
same for the dispatcher and every profile worker, so handoff is
|
||
path-stable.
|
||
- ``dir:<path>``: the path stored in ``workspace_path``. Created
|
||
if missing. MUST be absolute — relative paths are rejected to
|
||
prevent confused-deputy traversal where ``../../../tmp/attacker``
|
||
resolves against the dispatcher's CWD instead of a meaningful
|
||
root. Users who want a kanban-root-relative workspace should
|
||
compute the absolute path themselves.
|
||
- ``worktree``: a git worktree at ``workspace_path``. Not created
|
||
automatically in v1 -- the kanban-worker skill documents
|
||
``git worktree add`` as a worker-side step. Returns the intended path.
|
||
|
||
Persist the resolved path back to the task row via ``set_workspace_path``
|
||
so subsequent runs reuse the same directory.
|
||
"""
|
||
kind = task.workspace_kind or "scratch"
|
||
if kind == "scratch":
|
||
if task.workspace_path:
|
||
# Legacy scratch tasks that were set to an explicit path get the
|
||
# same absolute-path guard as dir: — consistent with the
|
||
# threat model.
|
||
p = Path(task.workspace_path).expanduser()
|
||
if not p.is_absolute():
|
||
raise ValueError(
|
||
f"task {task.id} has non-absolute workspace_path "
|
||
f"{task.workspace_path!r}; workspace paths must be absolute"
|
||
)
|
||
else:
|
||
p = workspaces_root(board=board) / task.id
|
||
p.mkdir(parents=True, exist_ok=True)
|
||
return p
|
||
if kind == "dir":
|
||
if not task.workspace_path:
|
||
raise ValueError(
|
||
f"task {task.id} has workspace_kind=dir but no workspace_path"
|
||
)
|
||
p = Path(task.workspace_path).expanduser()
|
||
if not p.is_absolute():
|
||
raise ValueError(
|
||
f"task {task.id} has non-absolute workspace_path "
|
||
f"{task.workspace_path!r}; use an absolute path "
|
||
f"(relative paths are ambiguous against the dispatcher's CWD)"
|
||
)
|
||
p.mkdir(parents=True, exist_ok=True)
|
||
return p
|
||
if kind == "worktree":
|
||
if not task.workspace_path:
|
||
# Default: .worktrees/<id>/ under CWD. Worker skill creates it.
|
||
return Path.cwd() / ".worktrees" / task.id
|
||
p = Path(task.workspace_path).expanduser()
|
||
if not p.is_absolute():
|
||
raise ValueError(
|
||
f"task {task.id} has non-absolute worktree path "
|
||
f"{task.workspace_path!r}; use an absolute path"
|
||
)
|
||
return p
|
||
raise ValueError(f"unknown workspace_kind: {kind}")
|
||
|
||
|
||
def set_workspace_path(
|
||
conn: sqlite3.Connection, task_id: str, path: Path | str
|
||
) -> None:
|
||
with write_txn(conn):
|
||
conn.execute(
|
||
"UPDATE tasks SET workspace_path = ? WHERE id = ?",
|
||
(str(path), task_id),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dispatcher (one-shot pass)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# After this many consecutive `spawn_failed` events on a task, the dispatcher
|
||
# stops retrying and parks the task in ``blocked`` with a reason so a human
|
||
# can investigate. Prevents the dispatcher from thrashing forever on a task
|
||
# whose profile doesn't exist, whose workspace is unmountable, etc.
|
||
DEFAULT_SPAWN_FAILURE_LIMIT = 5
|
||
|
||
# Max bytes to keep in a single worker log file. The dispatcher truncates
|
||
# and rotates on spawn if the file is larger than this at spawn time.
|
||
DEFAULT_LOG_ROTATE_BYTES = 2 * 1024 * 1024 # 2 MiB
|
||
|
||
|
||
@dataclass
|
||
class DispatchResult:
|
||
"""Outcome of a single ``dispatch`` pass."""
|
||
|
||
reclaimed: int = 0
|
||
promoted: int = 0
|
||
spawned: list[tuple[str, str, str]] = field(default_factory=list)
|
||
"""List of ``(task_id, assignee, workspace_path)`` triples."""
|
||
skipped_unassigned: list[str] = field(default_factory=list)
|
||
crashed: list[str] = field(default_factory=list)
|
||
"""Task ids reclaimed because their worker PID disappeared."""
|
||
auto_blocked: list[str] = field(default_factory=list)
|
||
"""Task ids auto-blocked by the spawn-failure circuit breaker."""
|
||
timed_out: list[str] = field(default_factory=list)
|
||
"""Task ids whose workers exceeded ``max_runtime_seconds``."""
|
||
|
||
|
||
def _pid_alive(pid: Optional[int]) -> bool:
|
||
"""Return True if ``pid`` is still running on this host.
|
||
|
||
Cross-platform: uses ``os.kill(pid, 0)`` on POSIX and ``OpenProcess``
|
||
on Windows. Returns False for falsy PIDs or on any OS error.
|
||
|
||
**Zombie handling (Linux):** ``os.kill(pid, 0)`` succeeds against
|
||
zombie processes (post-exit, pre-reap) because the process table
|
||
entry still exists. A worker that exits without being reaped by its
|
||
parent would stay "alive" to the dispatcher forever. Dispatcher
|
||
workers are started via ``start_new_session=True`` + intentional
|
||
Popen handle abandonment, so init reaps them quickly — but during
|
||
the window between exit and reap, we'd otherwise see stale "alive"
|
||
signals. On Linux we additionally peek at ``/proc/<pid>/status``
|
||
and treat ``State: Z`` as dead. On other POSIX or on Windows the
|
||
zombie check is a no-op.
|
||
"""
|
||
if not pid or pid <= 0:
|
||
return False
|
||
try:
|
||
if hasattr(os, "kill"):
|
||
os.kill(int(pid), 0)
|
||
except ProcessLookupError:
|
||
return False
|
||
except PermissionError:
|
||
# Process exists, we just can't signal it.
|
||
return True
|
||
except OSError:
|
||
return False
|
||
# Still here → kill(0) succeeded. Check for zombie on Linux.
|
||
if sys.platform == "linux":
|
||
try:
|
||
with open(f"/proc/{int(pid)}/status", "r") as f:
|
||
for line in f:
|
||
if line.startswith("State:"):
|
||
# "State:\tZ (zombie)" → dead
|
||
if "Z" in line.split(":", 1)[1]:
|
||
return False
|
||
break
|
||
except (FileNotFoundError, PermissionError, OSError):
|
||
# proc entry gone → already reaped; treat as dead.
|
||
# PermissionError shouldn't happen for our own children but
|
||
# be defensive.
|
||
pass
|
||
return True
|
||
|
||
|
||
def heartbeat_worker(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
note: Optional[str] = None,
|
||
) -> bool:
|
||
"""Record a ``heartbeat`` event + touch ``last_heartbeat_at``.
|
||
|
||
Called by long-running workers as a liveness signal orthogonal to
|
||
the PID check. A worker that forks a long-lived child (train loop,
|
||
video encode, web crawl) can have its Python still alive while the
|
||
actual work process is stuck; periodic heartbeats catch that.
|
||
|
||
Returns True on success, False if the task is not in a state that
|
||
should be heartbeating (not running, or claim expired).
|
||
"""
|
||
now = int(time.time())
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET last_heartbeat_at = ? "
|
||
"WHERE id = ? AND status = 'running'",
|
||
(now, task_id),
|
||
)
|
||
if cur.rowcount != 1:
|
||
return False
|
||
run_id = _current_run_id(conn, task_id)
|
||
if run_id is not None:
|
||
conn.execute(
|
||
"UPDATE task_runs SET last_heartbeat_at = ? WHERE id = ?",
|
||
(now, run_id),
|
||
)
|
||
_append_event(
|
||
conn, task_id, "heartbeat",
|
||
{"note": note} if note else None,
|
||
run_id=run_id,
|
||
)
|
||
return True
|
||
|
||
|
||
def enforce_max_runtime(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
signal_fn=None,
|
||
) -> list[str]:
|
||
"""Terminate workers whose per-task ``max_runtime_seconds`` has elapsed.
|
||
|
||
Sends SIGTERM, waits a short grace window, then SIGKILL. Emits a
|
||
``timed_out`` event and drops the task back to ``ready`` so the next
|
||
dispatcher tick re-spawns it — unless the spawn-failure circuit
|
||
breaker has already given up, in which case the task stays blocked
|
||
where ``_record_spawn_failure`` parked it.
|
||
|
||
Runs host-local: only tasks claimed by this host are candidates
|
||
(same reasoning as ``detect_crashed_workers``). ``signal_fn`` is a
|
||
test hook; defaults to ``os.kill`` on POSIX.
|
||
"""
|
||
import signal
|
||
timed_out: list[str] = []
|
||
now = int(time.time())
|
||
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
|
||
|
||
rows = conn.execute(
|
||
"SELECT id, worker_pid, started_at, max_runtime_seconds, claim_lock "
|
||
"FROM tasks "
|
||
"WHERE status = 'running' AND max_runtime_seconds IS NOT NULL "
|
||
" AND started_at IS NOT NULL AND worker_pid IS NOT NULL"
|
||
).fetchall()
|
||
for row in rows:
|
||
lock = row["claim_lock"] or ""
|
||
if not lock.startswith(host_prefix):
|
||
continue
|
||
elapsed = now - int(row["started_at"])
|
||
if elapsed < int(row["max_runtime_seconds"]):
|
||
continue
|
||
|
||
pid = int(row["worker_pid"])
|
||
tid = row["id"]
|
||
# SIGTERM then SIGKILL. Keep it simple: 5 s grace. Workers that
|
||
# want a cleaner shutdown can install their own SIGTERM handler
|
||
# before the grace expires.
|
||
killed = False
|
||
kill = signal_fn if signal_fn is not None else (
|
||
os.kill if hasattr(os, "kill") else None
|
||
)
|
||
if kill is not None:
|
||
try:
|
||
kill(pid, signal.SIGTERM)
|
||
except (ProcessLookupError, OSError):
|
||
pass
|
||
# Short polling wait — no time.sleep on the write txn.
|
||
for _ in range(10):
|
||
if not _pid_alive(pid):
|
||
break
|
||
time.sleep(0.5)
|
||
if _pid_alive(pid):
|
||
try:
|
||
kill(pid, signal.SIGKILL)
|
||
killed = True
|
||
except (ProcessLookupError, OSError):
|
||
pass
|
||
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
|
||
"claim_expires = NULL, worker_pid = NULL, "
|
||
"last_heartbeat_at = NULL "
|
||
"WHERE id = ? AND status = 'running'",
|
||
(tid,),
|
||
)
|
||
if cur.rowcount == 1:
|
||
payload = {
|
||
"pid": pid,
|
||
"elapsed_seconds": int(elapsed),
|
||
"limit_seconds": int(row["max_runtime_seconds"]),
|
||
"sigkill": killed,
|
||
}
|
||
run_id = _end_run(
|
||
conn, tid,
|
||
outcome="timed_out", status="timed_out",
|
||
error=f"elapsed {int(elapsed)}s > limit {int(row['max_runtime_seconds'])}s",
|
||
metadata=payload,
|
||
)
|
||
_append_event(
|
||
conn, tid, "timed_out", payload, run_id=run_id,
|
||
)
|
||
timed_out.append(tid)
|
||
return timed_out
|
||
|
||
|
||
def set_max_runtime(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
seconds: Optional[int],
|
||
) -> bool:
|
||
"""Set or clear the per-task max_runtime_seconds. Returns True on
|
||
success."""
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET max_runtime_seconds = ? WHERE id = ?",
|
||
(int(seconds) if seconds is not None else None, task_id),
|
||
)
|
||
return cur.rowcount == 1
|
||
|
||
|
||
def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]:
|
||
"""Reclaim ``running`` tasks whose worker PID is no longer alive.
|
||
|
||
Appends a ``crashed`` event and drops the task back to ``ready``.
|
||
Different from ``release_stale_claims``: this checks liveness
|
||
immediately rather than waiting for the claim TTL.
|
||
|
||
Only considers tasks claimed by *this host* — PIDs from other hosts
|
||
are meaningless here. The host-local check is enough because
|
||
``_default_spawn`` always runs the worker on the same host as the
|
||
dispatcher (the whole design is single-host).
|
||
"""
|
||
crashed: list[str] = []
|
||
with write_txn(conn):
|
||
rows = conn.execute(
|
||
"SELECT id, worker_pid, claim_lock FROM tasks "
|
||
"WHERE status = 'running' AND worker_pid IS NOT NULL"
|
||
).fetchall()
|
||
host_prefix = f"{_claimer_id().split(':', 1)[0]}:"
|
||
for row in rows:
|
||
# Only check liveness for claims owned by this host.
|
||
lock = row["claim_lock"] or ""
|
||
if not lock.startswith(host_prefix):
|
||
continue
|
||
if _pid_alive(row["worker_pid"]):
|
||
continue
|
||
cur = conn.execute(
|
||
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
|
||
"claim_expires = NULL, worker_pid = NULL "
|
||
"WHERE id = ? AND status = 'running'",
|
||
(row["id"],),
|
||
)
|
||
if cur.rowcount == 1:
|
||
run_id = _end_run(
|
||
conn, row["id"],
|
||
outcome="crashed", status="crashed",
|
||
error=f"pid {int(row['worker_pid'])} not alive",
|
||
metadata={
|
||
"pid": int(row["worker_pid"]),
|
||
"claimer": row["claim_lock"],
|
||
},
|
||
)
|
||
_append_event(
|
||
conn, row["id"], "crashed",
|
||
{"pid": int(row["worker_pid"]), "claimer": row["claim_lock"]},
|
||
run_id=run_id,
|
||
)
|
||
crashed.append(row["id"])
|
||
return crashed
|
||
|
||
|
||
def _record_spawn_failure(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
error: str,
|
||
*,
|
||
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
||
) -> bool:
|
||
"""Release the claim, increment the failure counter, maybe auto-block.
|
||
|
||
Returns True when the task was auto-blocked (N failures exceeded),
|
||
False when it was just released back to ``ready`` for another try.
|
||
"""
|
||
blocked = False
|
||
with write_txn(conn):
|
||
row = conn.execute(
|
||
"SELECT spawn_failures FROM tasks WHERE id = ?", (task_id,),
|
||
).fetchone()
|
||
failures = int(row["spawn_failures"]) + 1 if row else 1
|
||
if failures >= failure_limit:
|
||
conn.execute(
|
||
"UPDATE tasks SET status = 'blocked', claim_lock = NULL, "
|
||
"claim_expires = NULL, worker_pid = NULL, "
|
||
"spawn_failures = ?, last_spawn_error = ? "
|
||
"WHERE id = ? AND status IN ('running', 'ready')",
|
||
(failures, error[:500], task_id),
|
||
)
|
||
run_id = _end_run(
|
||
conn, task_id,
|
||
outcome="gave_up", status="gave_up",
|
||
error=error[:500],
|
||
metadata={"failures": failures},
|
||
)
|
||
_append_event(
|
||
conn, task_id, "gave_up",
|
||
{"failures": failures, "error": error[:500]},
|
||
run_id=run_id,
|
||
)
|
||
blocked = True
|
||
else:
|
||
conn.execute(
|
||
"UPDATE tasks SET status = 'ready', claim_lock = NULL, "
|
||
"claim_expires = NULL, worker_pid = NULL, "
|
||
"spawn_failures = ?, last_spawn_error = ? "
|
||
"WHERE id = ? AND status = 'running'",
|
||
(failures, error[:500], task_id),
|
||
)
|
||
run_id = _end_run(
|
||
conn, task_id,
|
||
outcome="spawn_failed", status="spawn_failed",
|
||
error=error[:500],
|
||
metadata={"failures": failures},
|
||
)
|
||
_append_event(
|
||
conn, task_id, "spawn_failed",
|
||
{"error": error[:500], "failures": failures},
|
||
run_id=run_id,
|
||
)
|
||
return blocked
|
||
|
||
|
||
def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None:
|
||
"""Record the spawned child's pid + emit a ``spawned`` event.
|
||
|
||
The event's payload carries the pid so a human reading ``hermes kanban
|
||
tail`` can correlate log lines with OS-level traces without opening
|
||
the drawer.
|
||
"""
|
||
with write_txn(conn):
|
||
conn.execute(
|
||
"UPDATE tasks SET worker_pid = ? WHERE id = ?",
|
||
(int(pid), task_id),
|
||
)
|
||
run_id = _current_run_id(conn, task_id)
|
||
if run_id is not None:
|
||
conn.execute(
|
||
"UPDATE task_runs SET worker_pid = ? WHERE id = ?",
|
||
(int(pid), run_id),
|
||
)
|
||
_append_event(conn, task_id, "spawned", {"pid": int(pid)}, run_id=run_id)
|
||
|
||
|
||
def _clear_spawn_failures(conn: sqlite3.Connection, task_id: str) -> None:
|
||
"""Reset the failure counter after a successful spawn."""
|
||
with write_txn(conn):
|
||
conn.execute(
|
||
"UPDATE tasks SET spawn_failures = 0, last_spawn_error = NULL "
|
||
"WHERE id = ?",
|
||
(task_id,),
|
||
)
|
||
|
||
|
||
def dispatch_once(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
spawn_fn=None,
|
||
ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS,
|
||
dry_run: bool = False,
|
||
max_spawn: Optional[int] = None,
|
||
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
||
board: Optional[str] = None,
|
||
) -> DispatchResult:
|
||
"""Run one dispatcher tick.
|
||
|
||
Steps:
|
||
1. Reclaim stale running tasks (TTL expired).
|
||
2. Reclaim crashed running tasks (host-local PID no longer alive).
|
||
3. Promote todo -> ready where all parents are done.
|
||
4. For each ready task with an assignee, atomically claim and call
|
||
``spawn_fn(task, workspace_path, board) -> Optional[int]``. The
|
||
return value (if any) is recorded as ``worker_pid`` so subsequent
|
||
ticks can detect crashes before the TTL expires.
|
||
|
||
Spawn failures are counted per-task. After ``failure_limit`` consecutive
|
||
failures the task is auto-blocked with the last error as its reason —
|
||
prevents the dispatcher from thrashing forever on an unfixable task.
|
||
|
||
``spawn_fn`` defaults to ``_default_spawn``. Tests pass a stub.
|
||
``board`` pins workspace/log/db resolution for this tick to a specific
|
||
board. When omitted, the current-board resolution chain is used.
|
||
"""
|
||
result = DispatchResult()
|
||
result.reclaimed = release_stale_claims(conn)
|
||
result.crashed = detect_crashed_workers(conn)
|
||
result.timed_out = enforce_max_runtime(conn)
|
||
result.promoted = recompute_ready(conn)
|
||
|
||
ready_rows = conn.execute(
|
||
"SELECT id, assignee FROM tasks "
|
||
"WHERE status = 'ready' AND claim_lock IS NULL "
|
||
"ORDER BY priority DESC, created_at ASC"
|
||
).fetchall()
|
||
spawned = 0
|
||
for row in ready_rows:
|
||
if max_spawn is not None and spawned >= max_spawn:
|
||
break
|
||
if not row["assignee"]:
|
||
result.skipped_unassigned.append(row["id"])
|
||
continue
|
||
if dry_run:
|
||
result.spawned.append((row["id"], row["assignee"], ""))
|
||
continue
|
||
claimed = claim_task(conn, row["id"], ttl_seconds=ttl_seconds)
|
||
if claimed is None:
|
||
continue
|
||
try:
|
||
workspace = resolve_workspace(claimed, board=board)
|
||
except Exception as exc:
|
||
auto = _record_spawn_failure(
|
||
conn, claimed.id, f"workspace: {exc}",
|
||
failure_limit=failure_limit,
|
||
)
|
||
if auto:
|
||
result.auto_blocked.append(claimed.id)
|
||
continue
|
||
# Persist the resolved workspace path so the worker can cd there.
|
||
set_workspace_path(conn, claimed.id, str(workspace))
|
||
_spawn = spawn_fn if spawn_fn is not None else _default_spawn
|
||
try:
|
||
# Back-compat: older spawn_fn signatures accept only
|
||
# (task, workspace). Test stubs in the suite rely on that.
|
||
# Introspect the callable and pass `board` only when supported.
|
||
import inspect
|
||
try:
|
||
sig = inspect.signature(_spawn)
|
||
if "board" in sig.parameters:
|
||
pid = _spawn(claimed, str(workspace), board=board)
|
||
else:
|
||
pid = _spawn(claimed, str(workspace))
|
||
except (TypeError, ValueError):
|
||
pid = _spawn(claimed, str(workspace))
|
||
if pid:
|
||
_set_worker_pid(conn, claimed.id, int(pid))
|
||
_clear_spawn_failures(conn, claimed.id)
|
||
result.spawned.append((claimed.id, claimed.assignee or "", str(workspace)))
|
||
spawned += 1
|
||
except Exception as exc:
|
||
auto = _record_spawn_failure(
|
||
conn, claimed.id, str(exc),
|
||
failure_limit=failure_limit,
|
||
)
|
||
if auto:
|
||
result.auto_blocked.append(claimed.id)
|
||
return result
|
||
|
||
|
||
def _rotate_worker_log(log_path: Path, max_bytes: int) -> None:
|
||
"""Rotate ``<log>`` to ``<log>.1`` if it exceeds ``max_bytes``.
|
||
|
||
Single-generation rotation — one old file kept, newer one replaces it.
|
||
Keeps disk usage bounded while still giving the user a chance to grab
|
||
the prior run's output.
|
||
"""
|
||
try:
|
||
if not log_path.exists():
|
||
return
|
||
if log_path.stat().st_size <= max_bytes:
|
||
return
|
||
rotated = log_path.with_suffix(log_path.suffix + ".1")
|
||
try:
|
||
if rotated.exists():
|
||
rotated.unlink()
|
||
except OSError:
|
||
pass
|
||
log_path.rename(rotated)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def _default_spawn(
|
||
task: Task,
|
||
workspace: str,
|
||
*,
|
||
board: Optional[str] = None,
|
||
) -> Optional[int]:
|
||
"""Fire-and-forget ``hermes -p <profile> chat -q ...`` subprocess.
|
||
|
||
Returns the spawned child's PID so the dispatcher can detect crashes
|
||
before the claim TTL expires. The child's completion is still observed
|
||
via the ``complete`` / ``block`` transitions the worker writes itself;
|
||
the PID check is a safety net for crashes, OOM kills, and Ctrl+C.
|
||
|
||
``board`` pins the child's kanban context to that board: the child's
|
||
``HERMES_KANBAN_DB`` / ``HERMES_KANBAN_BOARD`` / workspaces_root env
|
||
vars all resolve to the same board the dispatcher claimed the task
|
||
from. Workers cannot accidentally see other boards.
|
||
"""
|
||
import subprocess
|
||
if not task.assignee:
|
||
raise ValueError(f"task {task.id} has no assignee")
|
||
|
||
from hermes_cli.profiles import normalize_profile_name
|
||
|
||
profile_arg = normalize_profile_name(task.assignee)
|
||
|
||
prompt = f"work kanban task {task.id}"
|
||
env = dict(os.environ)
|
||
if task.tenant:
|
||
env["HERMES_TENANT"] = task.tenant
|
||
env["HERMES_KANBAN_TASK"] = task.id
|
||
env["HERMES_KANBAN_WORKSPACE"] = workspace
|
||
# Pin the shared board + workspaces root the dispatcher resolved, so
|
||
# that even when the worker activates a profile (`hermes -p <name>`
|
||
# rewrites HERMES_HOME), its kanban paths still match the
|
||
# dispatcher's. Belt-and-braces with the `get_default_hermes_root()`
|
||
# resolution in `kanban_home()` — symmetric resolution is the norm,
|
||
# but unusual symlink / Docker layouts are caught here too.
|
||
env["HERMES_KANBAN_DB"] = str(kanban_db_path(board=board))
|
||
env["HERMES_KANBAN_WORKSPACES_ROOT"] = str(workspaces_root(board=board))
|
||
# Board slug — the final defense-in-depth pin. If the worker ever
|
||
# resolves kanban paths without the DB / workspaces env vars, the
|
||
# board slug still forces it to the right directory.
|
||
resolved_board = _normalize_board_slug(board) or get_current_board()
|
||
env["HERMES_KANBAN_BOARD"] = resolved_board
|
||
# HERMES_PROFILE is the author the kanban_comment tool defaults to.
|
||
# `hermes -p <assignee>` activates the profile, but the env var is
|
||
# what the tool reads — set it explicitly here so comments are
|
||
# attributed correctly regardless of how the child loads config.
|
||
env["HERMES_PROFILE"] = profile_arg
|
||
|
||
cmd = [
|
||
"hermes",
|
||
"-p", profile_arg,
|
||
# Auto-load the kanban-worker skill so every dispatched worker
|
||
# has the pattern library (good summary/metadata shapes, retry
|
||
# diagnostics, block-reason examples) in its context, even if
|
||
# the profile hasn't wired it into skills config. The MANDATORY
|
||
# lifecycle is already in the system prompt via KANBAN_GUIDANCE;
|
||
# this skill is the deeper reference. Users can point a profile
|
||
# at a different/additional skill via config if they want —
|
||
# --skills is additive to the profile's default skill set.
|
||
"--skills", "kanban-worker",
|
||
]
|
||
# Per-task force-loaded skills. Each name goes in its own
|
||
# `--skills X` pair rather than a single comma-joined arg: the CLI
|
||
# accepts both forms (action='append' + comma-split), but
|
||
# per-name pairs are easier to read in `ps` output and avoid any
|
||
# quoting ambiguity if a skill name ever contains unusual chars.
|
||
# Dedupe against the built-in so we don't double-load kanban-worker
|
||
# if a task author asks for it explicitly.
|
||
if task.skills:
|
||
for sk in task.skills:
|
||
if sk and sk != "kanban-worker":
|
||
cmd.extend(["--skills", sk])
|
||
cmd.extend([
|
||
"chat",
|
||
"-q", prompt,
|
||
])
|
||
# Redirect output to a per-task log under <board-root>/logs/.
|
||
# Anchored at the board root (not the shared kanban root), so
|
||
# `hermes kanban log` on a specific board reads its own file and
|
||
# logs don't collide across boards that happen to share task ids.
|
||
log_dir = worker_logs_dir(board=board)
|
||
log_dir.mkdir(parents=True, exist_ok=True)
|
||
log_path = log_dir / f"{task.id}.log"
|
||
_rotate_worker_log(log_path, DEFAULT_LOG_ROTATE_BYTES)
|
||
|
||
# Use 'a' so a re-run on unblock appends rather than overwrites.
|
||
log_f = open(log_path, "ab")
|
||
try:
|
||
proc = subprocess.Popen( # noqa: S603 -- argv is a fixed list built above
|
||
cmd,
|
||
cwd=workspace if os.path.isdir(workspace) else None,
|
||
stdin=subprocess.DEVNULL,
|
||
stdout=log_f,
|
||
stderr=subprocess.STDOUT,
|
||
env=env,
|
||
start_new_session=True,
|
||
)
|
||
except FileNotFoundError:
|
||
log_f.close()
|
||
raise RuntimeError(
|
||
"`hermes` executable not found on PATH. "
|
||
"Install Hermes Agent or activate its venv before running the kanban dispatcher."
|
||
)
|
||
# NOTE: we intentionally do NOT close log_f here — we want Popen's
|
||
# child process to keep writing after this function returns. The
|
||
# handle is kept alive by the child's inheritance. The parent's
|
||
# reference goes out of scope and is GC'd, but the OS-level FD stays
|
||
# open in the child until the child exits.
|
||
return proc.pid
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Long-lived dispatcher daemon
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_daemon(
|
||
*,
|
||
interval: float = 60.0,
|
||
max_spawn: Optional[int] = None,
|
||
failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT,
|
||
stop_event=None,
|
||
on_tick=None,
|
||
) -> None:
|
||
"""Run the dispatcher in a loop until interrupted.
|
||
|
||
Calls :func:`dispatch_once` every ``interval`` seconds. Exits cleanly
|
||
on SIGINT / SIGTERM so ``hermes kanban daemon`` is systemd-friendly.
|
||
``stop_event`` (a :class:`threading.Event`) and ``on_tick`` (a
|
||
callable receiving the :class:`DispatchResult`) are test hooks.
|
||
"""
|
||
import signal
|
||
import threading
|
||
|
||
if stop_event is None:
|
||
stop_event = threading.Event()
|
||
|
||
def _handle(_signum, _frame):
|
||
stop_event.set()
|
||
|
||
# Install handlers only when running on the main thread — tests call
|
||
# this inline from worker threads and signal() would raise there.
|
||
if threading.current_thread() is threading.main_thread():
|
||
for sig_name in ("SIGINT", "SIGTERM"):
|
||
sig = getattr(signal, sig_name, None)
|
||
if sig is not None:
|
||
try:
|
||
signal.signal(sig, _handle)
|
||
except (ValueError, OSError):
|
||
pass
|
||
|
||
while not stop_event.is_set():
|
||
try:
|
||
with contextlib.closing(connect()) as conn:
|
||
res = dispatch_once(
|
||
conn,
|
||
max_spawn=max_spawn,
|
||
failure_limit=failure_limit,
|
||
)
|
||
if on_tick is not None:
|
||
try:
|
||
on_tick(res)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
# Don't let any single tick kill the daemon.
|
||
import traceback
|
||
traceback.print_exc()
|
||
stop_event.wait(timeout=interval)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Worker context builder (what a spawned worker sees)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def build_worker_context(conn: sqlite3.Connection, task_id: str) -> str:
|
||
"""Return the full text a worker should read to understand its task.
|
||
|
||
Order:
|
||
1. Task title (mandatory).
|
||
2. Task body (optional opening post, capped at 8 KB).
|
||
3. Prior attempts on THIS task (most recent ``_CTX_MAX_PRIOR_ATTEMPTS``
|
||
shown; older attempts collapsed into a one-line summary).
|
||
Each attempt's ``summary`` / ``error`` / ``metadata`` capped at
|
||
``_CTX_MAX_FIELD_BYTES`` each.
|
||
4. Structured handoff results of every done parent task. Prefers
|
||
``run.summary`` / ``run.metadata`` when the parent was executed
|
||
via a run; falls back to ``task.result`` for older data. Same
|
||
per-field cap.
|
||
5. Cross-task role history for the assignee (most recent 5
|
||
completed runs on other tasks).
|
||
6. Comment thread (most recent ``_CTX_MAX_COMMENTS`` shown, older
|
||
collapsed).
|
||
|
||
All caps exist so worker prompts stay bounded even on pathological
|
||
boards (retry-heavy tasks, comment storms). The per-field char cap
|
||
prevents a single 1 MB summary from dominating context.
|
||
"""
|
||
task = get_task(conn, task_id)
|
||
if not task:
|
||
raise ValueError(f"unknown task {task_id}")
|
||
|
||
def _cap(s: Optional[str], limit: int = _CTX_MAX_FIELD_BYTES) -> str:
|
||
"""Truncate a string to `limit` chars with a visible ellipsis."""
|
||
if not s:
|
||
return ""
|
||
s = s.strip()
|
||
if len(s) <= limit:
|
||
return s
|
||
return s[:limit] + f"… [truncated, {len(s) - limit} chars omitted]"
|
||
|
||
lines: list[str] = []
|
||
lines.append(f"# Kanban task {task.id}: {task.title}")
|
||
lines.append("")
|
||
lines.append(f"Assignee: {task.assignee or '(unassigned)'}")
|
||
lines.append(f"Status: {task.status}")
|
||
if task.tenant:
|
||
lines.append(f"Tenant: {task.tenant}")
|
||
lines.append(f"Workspace: {task.workspace_kind} @ {task.workspace_path or '(unresolved)'}")
|
||
lines.append("")
|
||
|
||
if task.body and task.body.strip():
|
||
lines.append("## Body")
|
||
lines.append(_cap(task.body, _CTX_MAX_BODY_BYTES))
|
||
lines.append("")
|
||
|
||
# Prior attempts — show closed runs so a retrying worker sees the
|
||
# history. Skip the currently-active run (that's this worker).
|
||
# Cap at _CTX_MAX_PRIOR_ATTEMPTS most-recent closed runs; older
|
||
# attempts get collapsed into a one-line marker so the worker knows
|
||
# more exist without bloating the prompt.
|
||
all_prior = [r for r in list_runs(conn, task_id) if r.ended_at is not None]
|
||
# list_runs returns ascending by started_at; "most recent" = last N
|
||
if len(all_prior) > _CTX_MAX_PRIOR_ATTEMPTS:
|
||
omitted = len(all_prior) - _CTX_MAX_PRIOR_ATTEMPTS
|
||
shown = all_prior[-_CTX_MAX_PRIOR_ATTEMPTS:]
|
||
first_shown_idx = omitted + 1
|
||
else:
|
||
omitted = 0
|
||
shown = all_prior
|
||
first_shown_idx = 1
|
||
if shown:
|
||
lines.append("## Prior attempts on this task")
|
||
if omitted:
|
||
lines.append(
|
||
f"_({omitted} earlier attempt{'s' if omitted != 1 else ''} "
|
||
f"omitted; showing most recent {len(shown)})_"
|
||
)
|
||
for offset, run in enumerate(shown):
|
||
idx = first_shown_idx + offset
|
||
ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(run.started_at))
|
||
profile = run.profile or "(unknown)"
|
||
outcome = run.outcome or run.status
|
||
lines.append(f"### Attempt {idx} — {outcome} ({profile}, {ts})")
|
||
if run.summary and run.summary.strip():
|
||
lines.append(_cap(run.summary))
|
||
if run.error and run.error.strip():
|
||
lines.append(f"_error_: {_cap(run.error)}")
|
||
if run.metadata:
|
||
try:
|
||
meta_str = json.dumps(run.metadata, ensure_ascii=False, sort_keys=True)
|
||
lines.append(f"_metadata_: `{_cap(meta_str)}`")
|
||
except Exception:
|
||
pass
|
||
lines.append("")
|
||
|
||
# Parents: prefer the most-recent 'completed' run's summary + metadata,
|
||
# fall back to ``task.result`` when no run rows exist (legacy DBs,
|
||
# or tasks completed before the runs table landed).
|
||
parent_rows = conn.execute(
|
||
"SELECT parent_id FROM task_links WHERE child_id = ? ORDER BY parent_id",
|
||
(task_id,),
|
||
).fetchall()
|
||
parent_ids = [r["parent_id"] for r in parent_rows]
|
||
|
||
if parent_ids:
|
||
wrote_header = False
|
||
for pid in parent_ids:
|
||
pt = get_task(conn, pid)
|
||
if not pt or pt.status != "done":
|
||
continue
|
||
runs = [r for r in list_runs(conn, pid) if r.outcome == "completed"]
|
||
runs.sort(key=lambda r: r.started_at, reverse=True)
|
||
run = runs[0] if runs else None
|
||
|
||
if not wrote_header:
|
||
lines.append("## Parent task results")
|
||
wrote_header = True
|
||
lines.append(f"### {pid}")
|
||
|
||
body_lines: list[str] = []
|
||
if run is not None and run.summary and run.summary.strip():
|
||
body_lines.append(_cap(run.summary))
|
||
elif pt.result:
|
||
body_lines.append(_cap(pt.result))
|
||
else:
|
||
body_lines.append("(no result recorded)")
|
||
|
||
if run is not None and run.metadata:
|
||
try:
|
||
meta_str = json.dumps(run.metadata, ensure_ascii=False, sort_keys=True)
|
||
body_lines.append(f"_metadata_: `{_cap(meta_str)}`")
|
||
except Exception:
|
||
pass
|
||
lines.extend(body_lines)
|
||
lines.append("")
|
||
|
||
# Cross-task role history: what else has THIS assignee completed
|
||
# recently? Gives the worker implicit continuity — "I'm the reviewer
|
||
# and my last three reviews focused on security" — without forcing
|
||
# the user to wire anything into SOUL.md / MEMORY.md. Bounded to the
|
||
# most recent 5 completed runs, excluding this task so the retry
|
||
# section above isn't duplicated. Safe on assignee=None (skipped).
|
||
if task.assignee:
|
||
role_rows = conn.execute(
|
||
"SELECT t.id, t.title, r.summary, r.ended_at "
|
||
"FROM task_runs r JOIN tasks t ON r.task_id = t.id "
|
||
"WHERE r.profile = ? AND r.task_id != ? "
|
||
" AND r.outcome = 'completed' "
|
||
"ORDER BY r.ended_at DESC LIMIT 5",
|
||
(task.assignee, task_id),
|
||
).fetchall()
|
||
if role_rows:
|
||
lines.append(f"## Recent work by @{task.assignee}")
|
||
for row in role_rows:
|
||
ts = time.strftime(
|
||
"%Y-%m-%d %H:%M", time.localtime(int(row["ended_at"]))
|
||
)
|
||
s = (row["summary"] or "").strip().splitlines()
|
||
first = s[0][:200] if s else "(no summary)"
|
||
lines.append(f"- {row['id']} — {row['title']} ({ts}): {first}")
|
||
lines.append("")
|
||
|
||
# Comments: cap at the most-recent _CTX_MAX_COMMENTS so
|
||
# comment-storm tasks don't blow out the worker's prompt. Older
|
||
# comments summarised in a one-line marker like prior attempts.
|
||
all_comments = list_comments(conn, task_id)
|
||
if len(all_comments) > _CTX_MAX_COMMENTS:
|
||
omitted_c = len(all_comments) - _CTX_MAX_COMMENTS
|
||
shown_c = all_comments[-_CTX_MAX_COMMENTS:]
|
||
else:
|
||
omitted_c = 0
|
||
shown_c = all_comments
|
||
if shown_c:
|
||
lines.append("## Comment thread")
|
||
if omitted_c:
|
||
lines.append(
|
||
f"_({omitted_c} earlier comment{'s' if omitted_c != 1 else ''} "
|
||
f"omitted; showing most recent {len(shown_c)})_"
|
||
)
|
||
for c in shown_c:
|
||
ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(c.created_at))
|
||
lines.append(f"**{c.author}** ({ts}):")
|
||
lines.append(_cap(c.body, _CTX_MAX_COMMENT_BYTES))
|
||
lines.append("")
|
||
|
||
return "\n".join(lines).rstrip() + "\n"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stats + SLA helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def board_stats(conn: sqlite3.Connection) -> dict:
|
||
"""Per-status + per-assignee counts, plus the oldest ``ready`` age in
|
||
seconds (the clearest staleness signal for a router or HUD).
|
||
"""
|
||
by_status: dict[str, int] = {}
|
||
for row in conn.execute(
|
||
"SELECT status, COUNT(*) AS n FROM tasks "
|
||
"WHERE status != 'archived' GROUP BY status"
|
||
):
|
||
by_status[row["status"]] = int(row["n"])
|
||
|
||
by_assignee: dict[str, dict[str, int]] = {}
|
||
for row in conn.execute(
|
||
"SELECT assignee, status, COUNT(*) AS n FROM tasks "
|
||
"WHERE status != 'archived' AND assignee IS NOT NULL "
|
||
"GROUP BY assignee, status"
|
||
):
|
||
by_assignee.setdefault(row["assignee"], {})[row["status"]] = int(row["n"])
|
||
|
||
oldest_row = conn.execute(
|
||
"SELECT MIN(created_at) AS ts FROM tasks WHERE status = 'ready'"
|
||
).fetchone()
|
||
now = int(time.time())
|
||
oldest_ready_age = (
|
||
(now - int(oldest_row["ts"]))
|
||
if oldest_row and oldest_row["ts"] is not None else None
|
||
)
|
||
|
||
return {
|
||
"by_status": by_status,
|
||
"by_assignee": by_assignee,
|
||
"oldest_ready_age_seconds": oldest_ready_age,
|
||
"now": now,
|
||
}
|
||
|
||
|
||
def task_age(task: Task) -> dict:
|
||
"""Return age metrics for a single task. All values are seconds or None."""
|
||
now = int(time.time())
|
||
age_since_created = now - int(task.created_at) if task.created_at else None
|
||
age_since_started = (
|
||
now - int(task.started_at) if task.started_at else None
|
||
)
|
||
time_to_complete = (
|
||
int(task.completed_at) - int(task.started_at or task.created_at)
|
||
if task.completed_at else None
|
||
)
|
||
return {
|
||
"created_age_seconds": age_since_created,
|
||
"started_age_seconds": age_since_started,
|
||
"time_to_complete_seconds": time_to_complete,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Notification subscriptions (used by the gateway kanban-notifier)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def add_notify_sub(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
task_id: str,
|
||
platform: str,
|
||
chat_id: str,
|
||
thread_id: Optional[str] = None,
|
||
user_id: Optional[str] = None,
|
||
) -> None:
|
||
"""Register a gateway source that wants terminal-state notifications
|
||
for ``task_id``. Idempotent on (task, platform, chat, thread)."""
|
||
now = int(time.time())
|
||
with write_txn(conn):
|
||
conn.execute(
|
||
"""
|
||
INSERT OR IGNORE INTO kanban_notify_subs
|
||
(task_id, platform, chat_id, thread_id, user_id, created_at)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(task_id, platform, chat_id, thread_id or "", user_id, now),
|
||
)
|
||
|
||
|
||
def list_notify_subs(
|
||
conn: sqlite3.Connection, task_id: Optional[str] = None,
|
||
) -> list[dict]:
|
||
if task_id is not None:
|
||
rows = conn.execute(
|
||
"SELECT * FROM kanban_notify_subs WHERE task_id = ?", (task_id,),
|
||
).fetchall()
|
||
else:
|
||
rows = conn.execute("SELECT * FROM kanban_notify_subs").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
def remove_notify_sub(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
task_id: str,
|
||
platform: str,
|
||
chat_id: str,
|
||
thread_id: Optional[str] = None,
|
||
) -> bool:
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"DELETE FROM kanban_notify_subs WHERE task_id = ? "
|
||
"AND platform = ? AND chat_id = ? AND thread_id = ?",
|
||
(task_id, platform, chat_id, thread_id or ""),
|
||
)
|
||
return cur.rowcount > 0
|
||
|
||
|
||
def unseen_events_for_sub(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
task_id: str,
|
||
platform: str,
|
||
chat_id: str,
|
||
thread_id: Optional[str] = None,
|
||
kinds: Optional[Iterable[str]] = None,
|
||
) -> tuple[int, list[Event]]:
|
||
"""Return ``(new_cursor, events)`` for a given subscription.
|
||
|
||
Only events with ``id > last_event_id`` are returned. The subscription's
|
||
cursor is NOT advanced here; call :func:`advance_notify_cursor` after
|
||
the gateway has successfully delivered the notifications.
|
||
"""
|
||
row = conn.execute(
|
||
"SELECT last_event_id FROM kanban_notify_subs "
|
||
"WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ?",
|
||
(task_id, platform, chat_id, thread_id or ""),
|
||
).fetchone()
|
||
if row is None:
|
||
return 0, []
|
||
cursor = int(row["last_event_id"])
|
||
kind_list = list(kinds) if kinds else None
|
||
q = (
|
||
"SELECT * FROM task_events WHERE task_id = ? AND id > ? "
|
||
+ ("AND kind IN (" + ",".join("?" * len(kind_list)) + ") " if kind_list else "")
|
||
+ "ORDER BY id ASC"
|
||
)
|
||
params: list[Any] = [task_id, cursor]
|
||
if kind_list:
|
||
params.extend(kind_list)
|
||
rows = conn.execute(q, params).fetchall()
|
||
out: list[Event] = []
|
||
max_id = cursor
|
||
for r in rows:
|
||
try:
|
||
payload = json.loads(r["payload"]) if r["payload"] else None
|
||
except Exception:
|
||
payload = None
|
||
out.append(Event(
|
||
id=r["id"], task_id=r["task_id"], kind=r["kind"],
|
||
payload=payload, created_at=r["created_at"],
|
||
run_id=(int(r["run_id"]) if "run_id" in r.keys() and r["run_id"] is not None else None),
|
||
))
|
||
max_id = max(max_id, int(r["id"]))
|
||
return max_id, out
|
||
|
||
|
||
def advance_notify_cursor(
|
||
conn: sqlite3.Connection,
|
||
*,
|
||
task_id: str,
|
||
platform: str,
|
||
chat_id: str,
|
||
thread_id: Optional[str] = None,
|
||
new_cursor: int,
|
||
) -> None:
|
||
with write_txn(conn):
|
||
conn.execute(
|
||
"UPDATE kanban_notify_subs SET last_event_id = ? "
|
||
"WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ?",
|
||
(int(new_cursor), task_id, platform, chat_id, thread_id or ""),
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Retention + garbage collection
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def gc_events(
|
||
conn: sqlite3.Connection, *, older_than_seconds: int = 30 * 24 * 3600,
|
||
) -> int:
|
||
"""Delete task_events rows older than ``older_than_seconds`` for tasks
|
||
in a terminal state (``done`` or ``archived``). Returns the number of
|
||
rows deleted. Running / ready / blocked tasks keep their full event
|
||
history."""
|
||
cutoff = int(time.time()) - int(older_than_seconds)
|
||
with write_txn(conn):
|
||
cur = conn.execute(
|
||
"DELETE FROM task_events WHERE created_at < ? AND task_id IN "
|
||
"(SELECT id FROM tasks WHERE status IN ('done', 'archived'))",
|
||
(cutoff,),
|
||
)
|
||
return int(cur.rowcount or 0)
|
||
|
||
|
||
def gc_worker_logs(
|
||
*, older_than_seconds: int = 30 * 24 * 3600,
|
||
board: Optional[str] = None,
|
||
) -> int:
|
||
"""Delete worker log files older than ``older_than_seconds``. Returns
|
||
the number of files removed. Kept separate from ``gc_events`` because
|
||
log files live on disk, not in SQLite. Scoped to ``board`` (defaults
|
||
to the active board) — per-board isolation means deleting logs from
|
||
board A cannot touch board B's logs."""
|
||
log_dir = worker_logs_dir(board=board)
|
||
if not log_dir.exists():
|
||
return 0
|
||
cutoff = time.time() - older_than_seconds
|
||
removed = 0
|
||
for p in log_dir.iterdir():
|
||
try:
|
||
if p.is_file() and p.stat().st_mtime < cutoff:
|
||
p.unlink()
|
||
removed += 1
|
||
except OSError:
|
||
continue
|
||
return removed
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Worker log accessor
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def worker_log_path(task_id: str, *, board: Optional[str] = None) -> Path:
|
||
"""Return the path to a worker's log file. The file may not exist
|
||
(task never spawned, or log already GC'd).
|
||
|
||
When ``board`` is None, resolves via the active board (env var →
|
||
current-board file → default). The dispatcher always passes the
|
||
board explicitly to avoid any resolution ambiguity when multiple
|
||
boards exist."""
|
||
return worker_logs_dir(board=board) / f"{task_id}.log"
|
||
|
||
|
||
def read_worker_log(
|
||
task_id: str, *, tail_bytes: Optional[int] = None,
|
||
board: Optional[str] = None,
|
||
) -> Optional[str]:
|
||
"""Read the worker log for ``task_id``. Returns None if the file
|
||
doesn't exist. If ``tail_bytes`` is set, only the last N bytes are
|
||
returned (useful for the dashboard drawer which shouldn't page megabytes)."""
|
||
path = worker_log_path(task_id, board=board)
|
||
if not path.exists():
|
||
return None
|
||
try:
|
||
if tail_bytes is None:
|
||
return path.read_text(encoding="utf-8", errors="replace")
|
||
size = path.stat().st_size
|
||
with open(path, "rb") as f:
|
||
if size > tail_bytes:
|
||
f.seek(size - tail_bytes)
|
||
# Skip a partial line if we tailed mid-line. But if the
|
||
# window has no newline at all (one giant log line),
|
||
# readline() would eat everything — in that case don't
|
||
# skip and return the raw tail.
|
||
probe = f.tell()
|
||
partial = f.readline()
|
||
if not partial.endswith(b"\n") and f.tell() >= size:
|
||
f.seek(probe)
|
||
data = f.read()
|
||
return data.decode("utf-8", errors="replace")
|
||
except OSError:
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Assignee enumeration (known profiles + per-profile board stats)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def list_profiles_on_disk() -> list[str]:
|
||
"""Return the set of named profiles discovered on disk.
|
||
|
||
Reads ``~/.hermes/profiles/`` directly so this module has no import
|
||
dependency on ``hermes_cli.profiles`` (which pulls in a large chunk
|
||
of the CLI startup path). Only returns directories that contain a
|
||
``config.yaml`` — a bare dir without config isn't a real profile.
|
||
"""
|
||
try:
|
||
from hermes_constants import get_default_hermes_root
|
||
home = get_default_hermes_root() / "profiles"
|
||
except Exception:
|
||
return []
|
||
if not home.is_dir():
|
||
return []
|
||
names: list[str] = []
|
||
try:
|
||
for entry in sorted(home.iterdir()):
|
||
if not entry.is_dir():
|
||
continue
|
||
if (entry / "config.yaml").is_file():
|
||
names.append(entry.name)
|
||
except OSError:
|
||
return names
|
||
return names
|
||
|
||
|
||
def known_assignees(conn: sqlite3.Connection) -> list[dict]:
|
||
"""Return every assignee name known to the board or on disk.
|
||
|
||
Each entry is ``{"name": str, "on_disk": bool, "counts": {status: n}}``.
|
||
A name is included when it's a configured profile on disk OR when
|
||
any non-archived task has it as the assignee. Used by:
|
||
|
||
- ``hermes kanban assignees`` for the terminal.
|
||
- The dashboard assignee dropdown (so a fresh profile appears in
|
||
the picker even before it's been given any task).
|
||
- Router-profile heuristics ("who's overloaded?") without scanning
|
||
the whole board.
|
||
"""
|
||
on_disk = set(list_profiles_on_disk())
|
||
|
||
# Count tasks per (assignee, status), excluding archived.
|
||
counts: dict[str, dict[str, int]] = {}
|
||
for row in conn.execute(
|
||
"SELECT assignee, status, COUNT(*) AS n FROM tasks "
|
||
"WHERE status != 'archived' AND assignee IS NOT NULL "
|
||
"GROUP BY assignee, status"
|
||
):
|
||
counts.setdefault(row["assignee"], {})[row["status"]] = int(row["n"])
|
||
|
||
names = sorted(on_disk | set(counts.keys()))
|
||
return [
|
||
{
|
||
"name": name,
|
||
"on_disk": name in on_disk,
|
||
"counts": counts.get(name, {}),
|
||
}
|
||
for name in names
|
||
]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Runs (attempt history on a task)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def list_runs(
|
||
conn: sqlite3.Connection,
|
||
task_id: str,
|
||
*,
|
||
include_active: bool = True,
|
||
) -> list[Run]:
|
||
"""Return all runs for ``task_id`` in start order.
|
||
|
||
``include_active=True`` (default) includes the currently-running
|
||
attempt if any. Set False to return only closed runs (useful for
|
||
"how many prior attempts have there been?" checks).
|
||
"""
|
||
q = "SELECT * FROM task_runs WHERE task_id = ?"
|
||
params: list[Any] = [task_id]
|
||
if not include_active:
|
||
q += " AND ended_at IS NOT NULL"
|
||
q += " ORDER BY started_at ASC, id ASC"
|
||
rows = conn.execute(q, params).fetchall()
|
||
return [Run.from_row(r) for r in rows]
|
||
|
||
|
||
def get_run(conn: sqlite3.Connection, run_id: int) -> Optional[Run]:
|
||
row = conn.execute(
|
||
"SELECT * FROM task_runs WHERE id = ?", (int(run_id),),
|
||
).fetchone()
|
||
return Run.from_row(row) if row else None
|
||
|
||
|
||
def active_run(conn: sqlite3.Connection, task_id: str) -> Optional[Run]:
|
||
"""Return the currently-open run for ``task_id`` (``ended_at IS NULL``)."""
|
||
row = conn.execute(
|
||
"SELECT * FROM task_runs WHERE task_id = ? AND ended_at IS NULL "
|
||
"ORDER BY started_at DESC LIMIT 1",
|
||
(task_id,),
|
||
).fetchone()
|
||
return Run.from_row(row) if row else None
|
||
|
||
|
||
def latest_run(conn: sqlite3.Connection, task_id: str) -> Optional[Run]:
|
||
"""Return the most recent run regardless of outcome (active or closed)."""
|
||
row = conn.execute(
|
||
"SELECT * FROM task_runs WHERE task_id = ? "
|
||
"ORDER BY started_at DESC, id DESC LIMIT 1",
|
||
(task_id,),
|
||
).fetchone()
|
||
return Run.from_row(row) if row else None
|