feat(projects): add per-profile project store

This commit is contained in:
Brooklyn Nicholson 2026-06-25 16:40:26 -05:00
parent 4cdd1a3230
commit 8a45ce2dd4
5 changed files with 1369 additions and 0 deletions

335
hermes_cli/projects_cmd.py Normal file
View file

@ -0,0 +1,335 @@
"""``hermes project`` CLI — manage first-class, multi-folder Projects.
A Project is a human-named workspace spanning one or more folders, with one
designated primary repo. Projects anchor desktop session grouping and (when
bound to a kanban board) give kanban tasks a deterministic worktree + branch
convention. State lives in the per-profile ``$HERMES_HOME/projects.db`` store
(see :mod:`hermes_cli.projects_db`).
This is a footprint-ladder rung-2 capability: a CLI command + gateway RPC,
with zero model-tool schema cost.
"""
from __future__ import annotations
import argparse
import functools
import sys
from hermes_cli import projects_db as pdb
def build_parser(
parent_subparsers: argparse._SubParsersAction,
) -> argparse.ArgumentParser:
"""Attach the ``project`` subcommand tree. Returns the top parser."""
parser = parent_subparsers.add_parser(
"project",
help="Manage projects (named, multi-folder workspaces)",
description=(
"Projects are human-named workspaces that can span multiple "
"folders / repos. They anchor desktop session grouping and, when "
"bound to a kanban board, give tasks a deterministic worktree + "
"branch convention. State is per-profile."
),
)
sub = parser.add_subparsers(dest="project_action")
p_create = sub.add_parser("create", help="Create a new project")
p_create.add_argument("name", help="Human name, e.g. 'Hermes Agent'")
p_create.add_argument(
"folders", nargs="*", help="Folder paths to include (first = primary)"
)
p_create.add_argument("--slug", default=None, help="Explicit slug override")
p_create.add_argument(
"--primary", default=None, metavar="PATH", help="Primary repo path"
)
p_create.add_argument("--description", default=None)
p_create.add_argument("--icon", default=None)
p_create.add_argument("--color", default=None)
p_create.add_argument(
"--board", default=None, metavar="SLUG", help="Bind a kanban board"
)
p_create.add_argument(
"--use", action="store_true", help="Set as the active project"
)
p_list = sub.add_parser("list", aliases=["ls"], help="List projects")
p_list.add_argument(
"--all", action="store_true", dest="include_archived",
help="Include archived projects",
)
p_show = sub.add_parser("show", help="Show a project's details")
p_show.add_argument("project", help="Project id or slug")
p_add = sub.add_parser("add-folder", help="Add a folder to a project")
p_add.add_argument("project", help="Project id or slug")
p_add.add_argument("path", help="Folder path")
p_add.add_argument("--label", default=None)
p_add.add_argument(
"--primary", action="store_true", help="Mark as primary repo"
)
p_rm = sub.add_parser("remove-folder", help="Remove a folder from a project")
p_rm.add_argument("project", help="Project id or slug")
p_rm.add_argument("path", help="Folder path")
p_rename = sub.add_parser("rename", help="Rename a project")
p_rename.add_argument("project", help="Project id or slug")
p_rename.add_argument("name", help="New name")
p_primary = sub.add_parser("set-primary", help="Set the primary folder")
p_primary.add_argument("project", help="Project id or slug")
p_primary.add_argument("path", help="Folder path (must already be in project)")
p_use = sub.add_parser("use", help="Set the active project")
p_use.add_argument(
"project", nargs="?", default=None,
help="Project id or slug (omit to clear)",
)
p_archive = sub.add_parser("archive", help="Archive a project")
p_archive.add_argument("project", help="Project id or slug")
p_restore = sub.add_parser("restore", help="Restore an archived project")
p_restore.add_argument("project", help="Project id or slug")
p_bind = sub.add_parser("bind-board", help="Bind a kanban board to a project")
p_bind.add_argument("project", help="Project id or slug")
p_bind.add_argument(
"board", nargs="?", default="", help="Board slug (omit to unbind)"
)
parser.set_defaults(_project_parser=parser)
return parser
def projects_command(args: argparse.Namespace) -> int:
"""Entry point from ``hermes project …`` argparse dispatch."""
action = getattr(args, "project_action", None)
if not action:
parser = getattr(args, "_project_parser", None)
if parser is not None:
parser.print_help()
else:
print(
"usage: hermes project <action> [options]\n"
"Run 'hermes project --help' for the full list.",
file=sys.stderr,
)
return 0
handlers = {
"create": _cmd_create,
"list": _cmd_list,
"ls": _cmd_list,
"show": _cmd_show,
"add-folder": _cmd_add_folder,
"remove-folder": _cmd_remove_folder,
"rename": _cmd_rename,
"set-primary": _cmd_set_primary,
"use": _cmd_use,
"archive": _cmd_archive,
"restore": _cmd_restore,
"bind-board": _cmd_bind_board,
}
handler = handlers.get(action)
if handler is None:
print(f"Unknown project action: {action}", file=sys.stderr)
return 1
return handler(args)
def _resolve(conn, ident: str):
proj = pdb.get_project(conn, ident)
if proj is None:
print(f"project: no such project: {ident}", file=sys.stderr)
return proj
def _with_project(fn):
"""Open the DB, resolve ``args.project``, and run ``fn(args, conn, proj)``.
Collapses the connect / resolve / not-found(1) / bad-arg(2) boilerplate every
project-scoped subcommand repeated.
"""
@functools.wraps(fn)
def wrapper(args: argparse.Namespace) -> int:
with pdb.connect_closing() as conn:
proj = _resolve(conn, args.project)
if proj is None:
return 1
try:
return fn(args, conn, proj)
except ValueError as exc:
print(f"project: {exc}", file=sys.stderr)
return 2
return wrapper
def _print_project(proj) -> None:
flags = " (archived)" if proj.archived else ""
print(f"{proj.slug} [{proj.id}]{flags}")
print(f" name: {proj.name}")
if proj.description:
print(f" about: {proj.description}")
if proj.board_slug:
print(f" board: {proj.board_slug}")
if proj.primary_path:
print(f" primary: {proj.primary_path}")
if proj.folders:
print(" folders:")
for f in proj.folders:
mark = " *" if f.is_primary else " "
label = f" ({f.label})" if f.label else ""
print(f" {mark} {f.path}{label}")
def _cmd_create(args: argparse.Namespace) -> int:
try:
with pdb.connect_closing() as conn:
pid = pdb.create_project(
conn,
name=args.name,
slug=args.slug,
folders=args.folders,
primary_path=args.primary,
description=args.description,
icon=args.icon,
color=args.color,
board_slug=args.board,
)
if args.use:
pdb.set_active(conn, pid)
proj = pdb.get_project(conn, pid)
except ValueError as exc:
print(f"project: {exc}", file=sys.stderr)
return 2
if proj is None:
print("project: vanished after create", file=sys.stderr)
return 2
print(f"Created project {proj.slug} ({pid})")
_print_project(proj)
return 0
def _cmd_list(args: argparse.Namespace) -> int:
with pdb.connect_closing() as conn:
active = pdb.get_active_id(conn)
projs = pdb.list_projects(
conn, include_archived=getattr(args, "include_archived", False)
)
if not projs:
print("No projects yet. Create one with `hermes project create <name>`.")
return 0
for p in projs:
marker = "*" if p.id == active else " "
flags = " (archived)" if p.archived else ""
nfolders = len(p.folders)
print(f"{marker} {p.slug:<24} {p.name}{flags} [{nfolders} folder(s)]")
return 0
@_with_project
def _cmd_show(args, conn, proj) -> int:
_print_project(proj)
return 0
@_with_project
def _cmd_add_folder(args, conn, proj) -> int:
path = pdb.add_folder(conn, proj.id, args.path, label=args.label, is_primary=args.primary)
print(f"Added {path} to {proj.slug}")
return 0
@_with_project
def _cmd_remove_folder(args, conn, proj) -> int:
if not pdb.remove_folder(conn, proj.id, args.path):
print(f"project: folder not in project: {args.path}", file=sys.stderr)
return 1
print(f"Removed {args.path} from {proj.slug}")
return 0
@_with_project
def _cmd_rename(args, conn, proj) -> int:
pdb.update_project(conn, proj.id, name=args.name)
print(f"Renamed {proj.slug} -> {args.name}")
return 0
@_with_project
def _cmd_set_primary(args, conn, proj) -> int:
if not pdb.set_primary(conn, proj.id, args.path):
print(
f"project: '{args.path}' is not a folder of {proj.slug}; "
f"add it first with `hermes project add-folder`.",
file=sys.stderr,
)
return 1
print(f"Set primary of {proj.slug} -> {args.path}")
return 0
def _cmd_use(args: argparse.Namespace) -> int:
with pdb.connect_closing() as conn:
if not args.project:
pdb.set_active(conn, None)
print("Cleared active project")
return 0
proj = _resolve(conn, args.project)
if proj is None:
return 1
pdb.set_active(conn, proj.id)
print(f"Active project: {proj.slug}")
return 0
@_with_project
def _cmd_archive(args, conn, proj) -> int:
pdb.archive_project(conn, proj.id)
print(f"Archived {proj.slug}")
return 0
@_with_project
def _cmd_restore(args, conn, proj) -> int:
pdb.restore_project(conn, proj.id)
print(f"Restored {proj.slug}")
return 0
@_with_project
def _cmd_bind_board(args, conn, proj) -> int:
pdb.update_project(conn, proj.id, board_slug=args.board)
if args.board.strip():
print(f"Bound {proj.slug} -> board {args.board}")
_sync_board_default_workdir(proj, args.board)
else:
print(f"Unbound board from {proj.slug}")
return 0
def _sync_board_default_workdir(proj, board_slug: str) -> None:
"""Best-effort: point the bound board's default_workdir at the primary repo.
Keeps kanban task worktrees anchored to the project's repo. Failures here
are non-fatal the binding itself already succeeded.
"""
if not proj.primary_path:
return
try:
from hermes_cli import kanban_db as kb
slug = kb._normalize_board_slug(board_slug)
if not slug:
return
if slug != kb.DEFAULT_BOARD and not kb.board_exists(slug):
return
kb.write_board_metadata(slug, default_workdir=proj.primary_path)
except Exception:
pass

727
hermes_cli/projects_db.py Normal file
View file

@ -0,0 +1,727 @@
"""Per-profile first-class Project store.
A **Project** is a human-named, multi-folder workspace. Unlike the desktop's
old inferred "workspaces" (derived from each session's ``cwd`` + a git probe)
and unlike kanban's self-generated worktrees, a Project is an explicit,
persisted entity the user creates and names. It anchors:
- **Desktop session grouping** a session belongs to a project when its
``cwd`` lives under one of the project's folders (longest-prefix match).
- **Kanban task worktrees** a task linked to a project creates its worktree
under the project's primary repo with a deterministic branch name, instead
of the random ``wt/<task-id>`` fallback.
Scope: **per-profile**, stored at ``$HERMES_HOME/projects.db`` (resolved via
``get_hermes_home()``), mirroring sessions / config / cron. This deliberately
differs from kanban, whose board DB is root-anchored and shared across
profiles. A Project may *bind* a kanban board (``board_slug``) so the two
systems agree on the repo + branch convention without merging their stores.
The schema is intentionally small and additive: column additions go through
:func:`_add_column_if_missing` so opening an old DB is always safe.
"""
from __future__ import annotations
import contextlib
import os
import re
import secrets
import sqlite3
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, List, Optional
from hermes_cli.sqlite_util import add_column_if_missing as _add_column_if_missing, write_txn
from hermes_constants import get_hermes_home
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
def projects_db_path() -> Path:
"""The per-profile projects DB path (``$HERMES_HOME/projects.db``).
Profile-aware: ``get_hermes_home()`` already points at the active profile's
home. Tests pass an explicit ``db_path`` to :func:`connect`.
"""
return get_hermes_home() / "projects.db"
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
slug TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
description TEXT,
icon TEXT,
color TEXT,
board_slug TEXT,
primary_path TEXT,
created_at INTEGER NOT NULL,
archived INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS project_folders (
project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
path TEXT NOT NULL,
label TEXT,
is_primary INTEGER NOT NULL DEFAULT 0,
added_at INTEGER NOT NULL,
PRIMARY KEY (project_id, path)
);
CREATE INDEX IF NOT EXISTS idx_project_folders_path
ON project_folders(path);
CREATE TABLE IF NOT EXISTS project_meta (
key TEXT PRIMARY KEY,
value TEXT
);
-- Git repos found by scanning the filesystem (desktop "repo-first" discovery).
-- Cached here so the overview is instant after the first scan instead of
-- re-walking the disk every time the Projects view opens.
CREATE TABLE IF NOT EXISTS discovered_repos (
root TEXT PRIMARY KEY,
label TEXT,
last_seen INTEGER NOT NULL
);
"""
# ---------------------------------------------------------------------------
# Slug + id helpers
# ---------------------------------------------------------------------------
# Lowercase alphanumerics, hyphens, underscores; 1-64 chars; no leading
# separator. Strict enough to stop traversal and path separators, loose enough
# for kebab-case names like ``hermes-agent``. Display formatting (spaces,
# emoji, capitalisation) lives in ``name``; the slug is just a stable handle.
_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9\-_]{0,63}$")
def _slugify(name: str) -> str:
"""Derive a slug candidate from a human name (best-effort)."""
s = str(name or "").strip().lower()
s = re.sub(r"[^a-z0-9]+", "-", s).strip("-_")
s = s[:64].strip("-_")
return s or "project"
def normalize_slug(slug: Optional[str]) -> Optional[str]:
"""Lowercase + strip a slug; validate; return ``None`` for empty."""
if slug is None:
return None
s = str(slug).strip().lower()
if not s:
return None
if not _SLUG_RE.match(s):
raise ValueError(
f"invalid project slug {slug!r}: must be 1-64 chars, lowercase "
f"alphanumerics / hyphens / underscores, not starting with "
f"'-' or '_'"
)
return s
def _new_project_id() -> str:
return "p_" + secrets.token_hex(4)
def _now() -> int:
return int(time.time())
def _normalize_path(path: str) -> str:
"""Absolute, user-expanded, separator-normalized path (no trailing sep)."""
p = os.path.abspath(os.path.expanduser(str(path).strip()))
return p.rstrip("/\\") or p
# ---------------------------------------------------------------------------
# Connection management
# ---------------------------------------------------------------------------
_INITIALIZED_PATHS: set[str] = set()
def connect(db_path: Optional[Path] = None) -> sqlite3.Connection:
"""Open (and initialize if needed) the per-profile projects DB.
WAL with DELETE fallback for network filesystems (shared helper from
``hermes_state``). Schema init is idempotent (``CREATE TABLE IF NOT
EXISTS`` + additive migrations) and cached per-path per-process.
"""
path = db_path if db_path is not None else projects_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
resolved = str(path.resolve())
conn = sqlite3.connect(str(path))
try:
conn.row_factory = sqlite3.Row
from hermes_state import apply_wal_with_fallback
apply_wal_with_fallback(conn, db_label="projects.db")
conn.execute("PRAGMA foreign_keys=ON")
if resolved not in _INITIALIZED_PATHS:
conn.executescript(SCHEMA_SQL)
_migrate_add_optional_columns(conn)
_INITIALIZED_PATHS.add(resolved)
except Exception:
conn.close()
raise
return conn
@contextlib.contextmanager
def connect_closing(db_path: Optional[Path] = None):
"""Open a projects DB connection and guarantee it is closed on exit.
sqlite3's connection context manager only commits/rollbacks; it does NOT
close the file descriptor. Long-lived processes (gateway, dashboard) route
many project operations through ``connect()``; without closing, FDs to
``projects.db`` accumulate. Mirrors ``kanban_db.connect_closing``.
"""
conn = connect(db_path=db_path)
try:
yield conn
finally:
try:
conn.close()
except Exception:
pass
# TEXT columns added to `projects` after v1; re-applied idempotently on every
# open so a legacy DB upgrades in place.
_OPTIONAL_PROJECT_COLUMNS = ("board_slug", "primary_path", "icon", "color")
def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
"""Add columns introduced after v1 to legacy DBs (safe on every open)."""
cols = {row["name"] for row in conn.execute("PRAGMA table_info(projects)")}
for col in _OPTIONAL_PROJECT_COLUMNS:
if col not in cols:
_add_column_if_missing(conn, "projects", col, f"{col} TEXT")
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
@dataclass
class ProjectFolder:
path: str
label: Optional[str] = None
is_primary: bool = False
added_at: int = 0
def to_dict(self) -> dict:
return {
"path": self.path,
"label": self.label,
"is_primary": bool(self.is_primary),
"added_at": self.added_at,
}
@dataclass
class Project:
id: str
slug: str
name: str
created_at: int
description: Optional[str] = None
icon: Optional[str] = None
color: Optional[str] = None
board_slug: Optional[str] = None
primary_path: Optional[str] = None
archived: bool = False
folders: List[ProjectFolder] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"id": self.id,
"slug": self.slug,
"name": self.name,
"description": self.description,
"icon": self.icon,
"color": self.color,
"board_slug": self.board_slug,
"primary_path": self.primary_path,
"archived": bool(self.archived),
"created_at": self.created_at,
"folders": [f.to_dict() for f in self.folders],
}
def _project_from_row(row: sqlite3.Row) -> Project:
keys = row.keys()
return Project(
id=row["id"],
slug=row["slug"],
name=row["name"],
created_at=row["created_at"],
description=row["description"] if "description" in keys else None,
icon=row["icon"] if "icon" in keys else None,
color=row["color"] if "color" in keys else None,
board_slug=row["board_slug"] if "board_slug" in keys else None,
primary_path=row["primary_path"] if "primary_path" in keys else None,
archived=bool(row["archived"]) if "archived" in keys else False,
)
def _load_folders(conn: sqlite3.Connection, project_id: str) -> List[ProjectFolder]:
rows = conn.execute(
"SELECT path, label, is_primary, added_at FROM project_folders "
"WHERE project_id = ? ORDER BY is_primary DESC, added_at ASC",
(project_id,),
).fetchall()
return [
ProjectFolder(
path=r["path"],
label=r["label"],
is_primary=bool(r["is_primary"]),
added_at=r["added_at"],
)
for r in rows
]
def _attach_folders(conn: sqlite3.Connection, project: Project) -> Project:
project.folders = _load_folders(conn, project.id)
return project
# ---------------------------------------------------------------------------
# CRUD
# ---------------------------------------------------------------------------
def _unique_slug(conn: sqlite3.Connection, candidate: str) -> str:
"""Return ``candidate`` or ``candidate-2``, ``-3`` ... if taken."""
base = candidate
n = 1
slug = base
while conn.execute(
"SELECT 1 FROM projects WHERE slug = ?", (slug,)
).fetchone() is not None:
n += 1
suffix = f"-{n}"
slug = (base[: 64 - len(suffix)]).rstrip("-_") + suffix
return slug
def create_project(
conn: sqlite3.Connection,
*,
name: str,
slug: Optional[str] = None,
folders: Optional[Iterable[str]] = None,
primary_path: Optional[str] = None,
description: Optional[str] = None,
icon: Optional[str] = None,
color: Optional[str] = None,
board_slug: Optional[str] = None,
) -> str:
"""Create a project and return its id.
``folders`` are normalized to absolute paths. If ``primary_path`` is given
it is added to the folder set (if not already present) and marked primary;
otherwise the first folder becomes primary.
"""
name = str(name or "").strip()
if not name:
raise ValueError("project name must not be empty")
slug_candidate = normalize_slug(slug) if slug else _slugify(name)
pid = _new_project_id()
now = _now()
folder_paths: List[str] = []
for f in folders or []:
norm = _normalize_path(f)
if norm and norm not in folder_paths:
folder_paths.append(norm)
primary = _normalize_path(primary_path) if primary_path else None
if primary and primary not in folder_paths:
folder_paths.insert(0, primary)
if primary is None and folder_paths:
primary = folder_paths[0]
with write_txn(conn):
unique = _unique_slug(conn, slug_candidate)
conn.execute(
"INSERT INTO projects "
"(id, slug, name, description, icon, color, board_slug, "
" primary_path, created_at, archived) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0)",
(
pid,
unique,
name,
description,
icon,
color,
normalize_slug(board_slug) if board_slug else None,
primary,
now,
),
)
for path in folder_paths:
conn.execute(
"INSERT INTO project_folders "
"(project_id, path, label, is_primary, added_at) "
"VALUES (?, ?, ?, ?, ?)",
(pid, path, None, 1 if path == primary else 0, now),
)
return pid
def list_projects(
conn: sqlite3.Connection, *, include_archived: bool = False
) -> List[Project]:
sql = "SELECT * FROM projects"
if not include_archived:
sql += " WHERE archived = 0"
sql += " ORDER BY created_at ASC"
rows = conn.execute(sql).fetchall()
return [_attach_folders(conn, _project_from_row(r)) for r in rows]
def get_project(
conn: sqlite3.Connection, id_or_slug: str
) -> Optional[Project]:
"""Look up a project by id first, then by slug."""
row = conn.execute(
"SELECT * FROM projects WHERE id = ?", (id_or_slug,)
).fetchone()
if row is None:
row = conn.execute(
"SELECT * FROM projects WHERE slug = ?", (str(id_or_slug).lower(),)
).fetchone()
if row is None:
return None
return _attach_folders(conn, _project_from_row(row))
def update_project(
conn: sqlite3.Connection,
project_id: str,
*,
name: Optional[str] = None,
description: Optional[str] = None,
icon: Optional[str] = None,
color: Optional[str] = None,
board_slug: Optional[str] = None,
) -> bool:
"""Patch top-level project fields. Only provided fields change.
``icon``, ``color``, and ``board_slug`` accept an empty string to clear
(store NULL) passing ``None`` leaves the field untouched, so callers that
want to clear must send ``""``.
"""
sets: List[str] = []
params: List[object] = []
if name is not None:
n = str(name).strip()
if not n:
raise ValueError("project name must not be empty")
sets.append("name = ?")
params.append(n)
if description is not None:
sets.append("description = ?")
params.append(description)
if icon is not None:
sets.append("icon = ?")
params.append(icon or None)
if color is not None:
sets.append("color = ?")
params.append(color or None)
if board_slug is not None:
sets.append("board_slug = ?")
params.append(normalize_slug(board_slug) if board_slug.strip() else None)
if not sets:
return False
params.append(project_id)
with write_txn(conn):
cur = conn.execute(
f"UPDATE projects SET {', '.join(sets)} WHERE id = ?", params
)
return cur.rowcount > 0
def add_folder(
conn: sqlite3.Connection,
project_id: str,
path: str,
*,
label: Optional[str] = None,
is_primary: bool = False,
) -> str:
"""Add a folder to a project. Returns the normalized path.
When ``is_primary`` is set, the folder becomes the project's primary repo
(the previous primary is demoted, and ``projects.primary_path`` updates).
"""
norm = _normalize_path(path)
if not norm:
raise ValueError("folder path must not be empty")
if get_project(conn, project_id) is None:
raise ValueError(f"no such project: {project_id}")
now = _now()
with write_txn(conn):
conn.execute(
"INSERT OR IGNORE INTO project_folders "
"(project_id, path, label, is_primary, added_at) "
"VALUES (?, ?, ?, 0, ?)",
(project_id, norm, label, now),
)
if label is not None:
conn.execute(
"UPDATE project_folders SET label = ? "
"WHERE project_id = ? AND path = ?",
(label, project_id, norm),
)
if is_primary:
_set_primary_locked(conn, project_id, norm)
else:
# First folder of an empty project becomes primary implicitly.
existing_primary = conn.execute(
"SELECT 1 FROM project_folders "
"WHERE project_id = ? AND is_primary = 1",
(project_id,),
).fetchone()
if existing_primary is None:
_set_primary_locked(conn, project_id, norm)
return norm
def remove_folder(conn: sqlite3.Connection, project_id: str, path: str) -> bool:
"""Remove a folder from a project. Repoints primary if it was primary."""
norm = _normalize_path(path)
with write_txn(conn):
was_primary = conn.execute(
"SELECT is_primary FROM project_folders "
"WHERE project_id = ? AND path = ?",
(project_id, norm),
).fetchone()
cur = conn.execute(
"DELETE FROM project_folders WHERE project_id = ? AND path = ?",
(project_id, norm),
)
if was_primary is not None and was_primary["is_primary"]:
nxt = conn.execute(
"SELECT path FROM project_folders WHERE project_id = ? "
"ORDER BY added_at ASC LIMIT 1",
(project_id,),
).fetchone()
new_primary = nxt["path"] if nxt else None
if new_primary:
_set_primary_locked(conn, project_id, new_primary)
else:
conn.execute(
"UPDATE projects SET primary_path = NULL WHERE id = ?",
(project_id,),
)
return cur.rowcount > 0
def _set_primary_locked(
conn: sqlite3.Connection, project_id: str, path: str
) -> None:
"""Set the primary folder (caller already holds a write txn)."""
conn.execute(
"UPDATE project_folders SET is_primary = 0 WHERE project_id = ?",
(project_id,),
)
conn.execute(
"UPDATE project_folders SET is_primary = 1 "
"WHERE project_id = ? AND path = ?",
(project_id, path),
)
conn.execute(
"UPDATE projects SET primary_path = ? WHERE id = ?",
(path, project_id),
)
def set_primary(conn: sqlite3.Connection, project_id: str, path: str) -> bool:
norm = _normalize_path(path)
with write_txn(conn):
exists = conn.execute(
"SELECT 1 FROM project_folders WHERE project_id = ? AND path = ?",
(project_id, norm),
).fetchone()
if exists is None:
return False
_set_primary_locked(conn, project_id, norm)
return True
def archive_project(conn: sqlite3.Connection, project_id: str) -> bool:
with write_txn(conn):
cur = conn.execute(
"UPDATE projects SET archived = 1 WHERE id = ?", (project_id,)
)
return cur.rowcount > 0
def restore_project(conn: sqlite3.Connection, project_id: str) -> bool:
with write_txn(conn):
cur = conn.execute(
"UPDATE projects SET archived = 0 WHERE id = ?", (project_id,)
)
return cur.rowcount > 0
def delete_project(conn: sqlite3.Connection, project_id: str) -> bool:
"""Hard-delete a project and its folders (cascade)."""
with write_txn(conn):
cur = conn.execute("DELETE FROM projects WHERE id = ?", (project_id,))
return cur.rowcount > 0
# ---------------------------------------------------------------------------
# Active-project pointer (project_meta KV)
# ---------------------------------------------------------------------------
_ACTIVE_META_KEY = "active_id"
def set_active(conn: sqlite3.Connection, project_id: Optional[str]) -> None:
"""Set (or clear, when ``None``) the active project pointer."""
with write_txn(conn):
if project_id is None:
conn.execute("DELETE FROM project_meta WHERE key = ?", (_ACTIVE_META_KEY,))
else:
conn.execute(
"INSERT INTO project_meta (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(_ACTIVE_META_KEY, project_id),
)
def get_active_id(conn: sqlite3.Connection) -> Optional[str]:
row = conn.execute(
"SELECT value FROM project_meta WHERE key = ?", (_ACTIVE_META_KEY,)
).fetchone()
return row["value"] if row else None
# ---------------------------------------------------------------------------
# Discovered repos (filesystem scan cache)
# ---------------------------------------------------------------------------
def record_discovered_repos(
conn: sqlite3.Connection,
repos: Iterable[tuple[str, Optional[str]]],
*,
replace: bool = False,
) -> int:
"""Persist scanned git repo roots into the cache.
``repos`` is an iterable of ``(root, label)``. Roots are normalized; the
label falls back to the basename. Returns the number of rows written.
When ``replace`` is true, this is the authoritative result of a fresh disk
scan: delete stale rows first so old eval/worktree noise disappears instead
of living forever in the cache.
"""
now = _now()
rows = []
for root, label in repos:
norm = _normalize_path(root)
if not norm:
continue
rows.append((norm, (label or os.path.basename(norm) or norm), now))
with write_txn(conn):
if replace:
conn.execute("DELETE FROM discovered_repos")
if rows:
conn.executemany(
"INSERT INTO discovered_repos (root, label, last_seen) VALUES (?, ?, ?) "
"ON CONFLICT(root) DO UPDATE SET label = excluded.label, "
"last_seen = excluded.last_seen",
rows,
)
return len(rows)
def list_discovered_repos(conn: sqlite3.Connection) -> List[dict]:
"""All cached discovered repo roots, most-recently-seen first."""
rows = conn.execute(
"SELECT root, label, last_seen FROM discovered_repos ORDER BY last_seen DESC"
).fetchall()
return [
{"root": r["root"], "label": r["label"], "last_seen": r["last_seen"]}
for r in rows
]
# ---------------------------------------------------------------------------
# Resolution + naming
# ---------------------------------------------------------------------------
def project_for_path(
conn: sqlite3.Connection, path: str, *, include_archived: bool = False
) -> Optional[Project]:
"""Return the project owning ``path`` (longest-prefix folder match).
A folder owns ``path`` when ``path`` equals the folder or is nested under
it. The most specific (longest) folder wins, so nested projects resolve to
the innermost one.
"""
if not str(path or "").strip():
return None
target = _normalize_path(path)
sql = (
"SELECT pf.project_id AS pid, pf.path AS folder "
"FROM project_folders pf JOIN projects p ON p.id = pf.project_id"
)
if not include_archived:
sql += " WHERE p.archived = 0"
best_pid: Optional[str] = None
best_len = -1
for row in conn.execute(sql).fetchall():
folder = row["folder"]
if target == folder or target.startswith(folder.rstrip("/\\") + os.sep) or \
target.startswith(folder.rstrip("/\\") + "/"):
if len(folder) > best_len:
best_len = len(folder)
best_pid = row["pid"]
if best_pid is None:
return None
return get_project(conn, best_pid)
# Deterministic branch slug: lowercase, separators collapsed, capped.
_BRANCH_SAFE_RE = re.compile(r"[^a-z0-9._-]+")
def branch_name_for(project: Project, task_id: str, *, title: str = "") -> str:
"""Deterministic branch name for a project-linked kanban task.
Shape: ``<project-slug>/<task-id>`` (optionally ``-<title-slug>``). Stable
and human-meaningful, replacing the random ``wt/<task-id>`` fallback.
"""
slug = project.slug or _slugify(project.name)
base = f"{slug}/{task_id}"
if title:
tslug = _BRANCH_SAFE_RE.sub("-", str(title).strip().lower()).strip("-")
tslug = tslug[:40].strip("-")
if tslug:
base = f"{base}-{tslug}"
return base

49
hermes_cli/sqlite_util.py Normal file
View file

@ -0,0 +1,49 @@
"""Shared SQLite primitives for the small per-profile / board stores.
The projects and kanban stores open WAL SQLite files with the same two
primitives an idempotent column-add migration and an IMMEDIATE write
transaction. One definition here keeps the two stores from drifting.
"""
from __future__ import annotations
import contextlib
import sqlite3
def add_column_if_missing(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> bool:
"""``ALTER TABLE <table> ADD COLUMN <ddl>``, idempotent across races.
Returns ``True`` when this call added the column. Swallows the
``duplicate column name`` error a concurrent migrator may have run first
(issue #21708). ``column`` is the human-readable name for the call site;
``ddl`` carries the actual definition.
"""
try:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}")
return True
except sqlite3.OperationalError as exc:
if "duplicate column name" in str(exc).lower():
return False
raise
@contextlib.contextmanager
def write_txn(conn: sqlite3.Connection):
"""An IMMEDIATE write transaction: at most one concurrent writer wins.
The explicit ROLLBACK is guarded so a SQLite auto-rollback (no active
transaction left under EIO / lock contention / corruption) cannot shadow
the original exception with a spurious rollback error.
"""
conn.execute("BEGIN IMMEDIATE")
try:
yield conn
except Exception:
try:
conn.execute("ROLLBACK")
except sqlite3.OperationalError:
pass
raise
else:
conn.execute("COMMIT")

View file

@ -0,0 +1,84 @@
"""Tests for the `hermes project` CLI dispatch (hermes_cli/projects_cmd)."""
from __future__ import annotations
import argparse
import pytest
from hermes_cli import projects_cmd
from hermes_cli import projects_db as pdb
def _run(argv):
"""Build the project subparser, parse argv, and dispatch. Returns rc."""
parser = argparse.ArgumentParser()
sub = parser.add_subparsers(dest="command")
p = projects_cmd.build_parser(sub)
p.set_defaults(func=projects_cmd.projects_command)
args = parser.parse_args(["project", *argv])
return projects_cmd.projects_command(args)
def test_create_list_show(capsys, tmp_path):
assert _run(["create", "My App", str(tmp_path), "--use"]) == 0
out = capsys.readouterr().out
assert "Created project" in out
with pdb.connect_closing() as conn:
projects = pdb.list_projects(conn)
assert len(projects) == 1
assert projects[0].name == "My App"
# --use set it active.
assert pdb.get_active_id(conn) == projects[0].id
assert _run(["list"]) == 0
assert "my-app" in capsys.readouterr().out
assert _run(["show", "my-app"]) == 0
assert "My App" in capsys.readouterr().out
def test_add_remove_folder(tmp_path):
_run(["create", "P", str(tmp_path / "a")])
assert _run(["add-folder", "p", str(tmp_path / "b")]) == 0
with pdb.connect_closing() as conn:
proj = pdb.get_project(conn, "p")
assert len(proj.folders) == 2
assert _run(["remove-folder", "p", str(tmp_path / "b")]) == 0
with pdb.connect_closing() as conn:
assert len(pdb.get_project(conn, "p").folders) == 1
def test_rename_and_archive(tmp_path):
_run(["create", "Old Name", str(tmp_path)])
assert _run(["rename", "old-name", "New Name"]) == 0
with pdb.connect_closing() as conn:
assert pdb.get_project(conn, "old-name").name == "New Name"
assert _run(["archive", "old-name"]) == 0
with pdb.connect_closing() as conn:
assert pdb.list_projects(conn) == []
assert len(pdb.list_projects(conn, include_archived=True)) == 1
assert _run(["restore", "old-name"]) == 0
with pdb.connect_closing() as conn:
assert len(pdb.list_projects(conn)) == 1
def test_use_clear(tmp_path):
_run(["create", "P", str(tmp_path)])
_run(["use", "p"])
with pdb.connect_closing() as conn:
assert pdb.get_active_id(conn) is not None
_run(["use"])
with pdb.connect_closing() as conn:
assert pdb.get_active_id(conn) is None
def test_unknown_project_returns_error(capsys, tmp_path):
assert _run(["show", "nope"]) == 1
assert "no such project" in capsys.readouterr().err

View file

@ -0,0 +1,174 @@
"""Tests for the per-profile Projects store (hermes_cli/projects_db)."""
from __future__ import annotations
import os
import pytest
from hermes_cli import projects_db as pdb
@pytest.fixture
def conn(tmp_path):
c = pdb.connect(db_path=tmp_path / "projects.db")
try:
yield c
finally:
c.close()
def test_record_and_list_discovered_repos(conn):
n = pdb.record_discovered_repos(conn, [("/www/alpha", "alpha"), ("/www/beta", None)])
assert n == 2
rows = {r["root"]: r["label"] for r in pdb.list_discovered_repos(conn)}
assert rows["/www/alpha"] == "alpha"
# Label defaults to the basename when not given.
assert rows["/www/beta"] == "beta"
def test_record_discovered_repos_upserts(conn):
pdb.record_discovered_repos(conn, [("/www/alpha", "old")])
pdb.record_discovered_repos(conn, [("/www/alpha", "new")])
rows = pdb.list_discovered_repos(conn)
assert len(rows) == 1
assert rows[0]["label"] == "new"
def test_record_discovered_repos_replace_drops_stale_rows(conn):
pdb.record_discovered_repos(conn, [("/www/alpha", "alpha"), ("/www/beta", "beta")])
pdb.record_discovered_repos(conn, [("/www/alpha", "fresh")], replace=True)
rows = {r["root"]: r["label"] for r in pdb.list_discovered_repos(conn)}
assert rows == {"/www/alpha": "fresh"}
def test_create_get_list(conn):
pid = pdb.create_project(conn, name="Hermes Agent", folders=["/tmp/hermes"])
proj = pdb.get_project(conn, pid)
assert proj is not None
assert proj.slug == "hermes-agent"
assert proj.name == "Hermes Agent"
# First folder becomes primary.
assert proj.primary_path == "/tmp/hermes"
assert [f.path for f in proj.folders] == ["/tmp/hermes"]
assert proj.folders[0].is_primary is True
# Lookup by slug too.
assert pdb.get_project(conn, "hermes-agent").id == pid
assert len(pdb.list_projects(conn)) == 1
def test_slug_collision_disambiguates(conn):
pdb.create_project(conn, name="Hermes Agent")
pdb.create_project(conn, name="Hermes Agent")
slugs = sorted(p.slug for p in pdb.list_projects(conn))
assert slugs == ["hermes-agent", "hermes-agent-2"]
def test_empty_name_rejected(conn):
with pytest.raises(ValueError):
pdb.create_project(conn, name=" ")
def test_add_remove_folder_and_primary_repoint(conn):
pid = pdb.create_project(conn, name="P", folders=["/a"])
pdb.add_folder(conn, pid, "/b")
pdb.add_folder(conn, pid, "/c", is_primary=True)
proj = pdb.get_project(conn, pid)
assert proj.primary_path == "/c"
assert {f.path for f in proj.folders} == {"/a", "/b", "/c"}
# Removing the primary repoints to the oldest remaining folder.
pdb.remove_folder(conn, pid, "/c")
proj = pdb.get_project(conn, pid)
assert proj.primary_path == "/a"
# Removing the last folder clears the primary.
pdb.remove_folder(conn, pid, "/a")
pdb.remove_folder(conn, pid, "/b")
proj = pdb.get_project(conn, pid)
assert proj.primary_path is None
assert proj.folders == []
def test_set_primary_requires_existing_folder(conn):
pid = pdb.create_project(conn, name="P", folders=["/a"])
assert pdb.set_primary(conn, pid, "/nope") is False
assert pdb.set_primary(conn, pid, "/a") is True
def test_paths_normalized(conn):
pid = pdb.create_project(conn, name="P", folders=["/a/b/../c/"])
proj = pdb.get_project(conn, pid)
# Trailing slash stripped, .. collapsed.
assert proj.primary_path == "/a/c"
def test_project_for_path_longest_prefix(conn):
outer = pdb.create_project(conn, name="Outer", folders=["/www"])
inner = pdb.create_project(conn, name="Inner", folders=["/www/app"])
assert pdb.project_for_path(conn, "/www/app/src/x.py").id == inner
assert pdb.project_for_path(conn, "/www/other").id == outer
assert pdb.project_for_path(conn, "/elsewhere") is None
# Segment-wise prefix only: /www/app must not match /www/application.
assert pdb.project_for_path(conn, "/www/application").id == outer
def test_project_for_path_skips_archived(conn):
pid = pdb.create_project(conn, name="P", folders=["/www/app"])
pdb.archive_project(conn, pid)
assert pdb.project_for_path(conn, "/www/app/src") is None
# Archived hidden from the default list but visible with include_archived.
assert pdb.list_projects(conn) == []
assert len(pdb.list_projects(conn, include_archived=True)) == 1
pdb.restore_project(conn, pid)
assert pdb.project_for_path(conn, "/www/app/src").id == pid
def test_active_pointer(conn):
pid = pdb.create_project(conn, name="P")
assert pdb.get_active_id(conn) is None
pdb.set_active(conn, pid)
assert pdb.get_active_id(conn) == pid
pdb.set_active(conn, None)
assert pdb.get_active_id(conn) is None
def test_branch_name_for_is_deterministic():
proj = pdb.Project(id="p_1", slug="web-app", name="Web App", created_at=0)
assert pdb.branch_name_for(proj, "t_abc") == "web-app/t_abc"
assert pdb.branch_name_for(proj, "t_abc", title="Add login!") == "web-app/t_abc-add-login"
# Stable across calls.
assert pdb.branch_name_for(proj, "t_abc") == pdb.branch_name_for(proj, "t_abc")
def test_per_profile_isolation(tmp_path):
# Two distinct DB paths stand in for two profiles' HERMES_HOME.
a = pdb.connect(db_path=tmp_path / "a" / "projects.db")
b = pdb.connect(db_path=tmp_path / "b" / "projects.db")
try:
pdb.create_project(a, name="Only In A", folders=["/a"])
assert [p.slug for p in pdb.list_projects(a)] == ["only-in-a"]
assert pdb.list_projects(b) == []
finally:
a.close()
b.close()
def test_db_path_under_hermes_home():
# Resolves under HERMES_HOME (set by the autouse isolation fixture).
assert pdb.projects_db_path().name == "projects.db"
assert os.path.basename(str(pdb.projects_db_path().parent)) # non-empty parent