"""Per-profile first-class Project store. A **Project** is a human-named, multi-folder workspace. Unlike the desktop's old inferred "workspaces" (derived from each session's ``cwd`` + a git probe) and unlike kanban's self-generated worktrees, a Project is an explicit, persisted entity the user creates and names. It anchors: - **Desktop session grouping** — a session belongs to a project when its ``cwd`` lives under one of the project's folders (longest-prefix match). - **Kanban task worktrees** — a task linked to a project creates its worktree under the project's primary repo with a deterministic branch name, instead of the random ``wt/`` fallback. Scope: **per-profile**, stored at ``$HERMES_HOME/projects.db`` (resolved via ``get_hermes_home()``), mirroring sessions / config / cron. This deliberately differs from kanban, whose board DB is root-anchored and shared across profiles. A Project may *bind* a kanban board (``board_slug``) so the two systems agree on the repo + branch convention without merging their stores. The schema is intentionally small and additive: column additions go through :func:`_add_column_if_missing` so opening an old DB is always safe. """ from __future__ import annotations import contextlib import os import re import secrets import sqlite3 import time from dataclasses import dataclass, field from pathlib import Path from typing import Iterable, List, Optional from hermes_cli.sqlite_util import add_column_if_missing as _add_column_if_missing, write_txn from hermes_constants import get_hermes_home # --------------------------------------------------------------------------- # Paths # --------------------------------------------------------------------------- def projects_db_path() -> Path: """The per-profile projects DB path (``$HERMES_HOME/projects.db``). Profile-aware: ``get_hermes_home()`` already points at the active profile's home. Tests pass an explicit ``db_path`` to :func:`connect`. """ return get_hermes_home() / "projects.db" # --------------------------------------------------------------------------- # Schema # --------------------------------------------------------------------------- SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, slug TEXT NOT NULL UNIQUE, name TEXT NOT NULL, description TEXT, icon TEXT, color TEXT, board_slug TEXT, primary_path TEXT, created_at INTEGER NOT NULL, archived INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS project_folders ( project_id TEXT NOT NULL REFERENCES projects(id) ON DELETE CASCADE, path TEXT NOT NULL, label TEXT, is_primary INTEGER NOT NULL DEFAULT 0, added_at INTEGER NOT NULL, PRIMARY KEY (project_id, path) ); CREATE INDEX IF NOT EXISTS idx_project_folders_path ON project_folders(path); CREATE TABLE IF NOT EXISTS project_meta ( key TEXT PRIMARY KEY, value TEXT ); -- Git repos found by scanning the filesystem (desktop "repo-first" discovery). -- Cached here so the overview is instant after the first scan instead of -- re-walking the disk every time the Projects view opens. CREATE TABLE IF NOT EXISTS discovered_repos ( root TEXT PRIMARY KEY, label TEXT, last_seen INTEGER NOT NULL ); """ # --------------------------------------------------------------------------- # Slug + id helpers # --------------------------------------------------------------------------- # Lowercase alphanumerics, hyphens, underscores; 1-64 chars; no leading # separator. Strict enough to stop traversal and path separators, loose enough # for kebab-case names like ``hermes-agent``. Display formatting (spaces, # emoji, capitalisation) lives in ``name``; the slug is just a stable handle. _SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9\-_]{0,63}$") def _slugify(name: str) -> str: """Derive a slug candidate from a human name (best-effort).""" s = str(name or "").strip().lower() s = re.sub(r"[^a-z0-9]+", "-", s).strip("-_") s = s[:64].strip("-_") return s or "project" def normalize_slug(slug: Optional[str]) -> Optional[str]: """Lowercase + strip a slug; validate; return ``None`` for empty.""" if slug is None: return None s = str(slug).strip().lower() if not s: return None if not _SLUG_RE.match(s): raise ValueError( f"invalid project slug {slug!r}: must be 1-64 chars, lowercase " f"alphanumerics / hyphens / underscores, not starting with " f"'-' or '_'" ) return s def _new_project_id() -> str: return "p_" + secrets.token_hex(4) def _now() -> int: return int(time.time()) def _normalize_path(path: str) -> str: """Absolute, user-expanded, separator-normalized path (no trailing sep).""" p = os.path.abspath(os.path.expanduser(str(path).strip())) return p.rstrip("/\\") or p # --------------------------------------------------------------------------- # Connection management # --------------------------------------------------------------------------- _INITIALIZED_PATHS: set[str] = set() def connect(db_path: Optional[Path] = None) -> sqlite3.Connection: """Open (and initialize if needed) the per-profile projects DB. WAL with DELETE fallback for network filesystems (shared helper from ``hermes_state``). Schema init is idempotent (``CREATE TABLE IF NOT EXISTS`` + additive migrations) and cached per-path per-process. """ path = db_path if db_path is not None else projects_db_path() path.parent.mkdir(parents=True, exist_ok=True) resolved = str(path.resolve()) conn = sqlite3.connect(str(path)) try: conn.row_factory = sqlite3.Row from hermes_state import apply_wal_with_fallback apply_wal_with_fallback(conn, db_label="projects.db") conn.execute("PRAGMA foreign_keys=ON") if resolved not in _INITIALIZED_PATHS: conn.executescript(SCHEMA_SQL) _migrate_add_optional_columns(conn) _INITIALIZED_PATHS.add(resolved) except Exception: conn.close() raise return conn @contextlib.contextmanager def connect_closing(db_path: Optional[Path] = None): """Open a projects DB connection and guarantee it is closed on exit. sqlite3's connection context manager only commits/rollbacks; it does NOT close the file descriptor. Long-lived processes (gateway, dashboard) route many project operations through ``connect()``; without closing, FDs to ``projects.db`` accumulate. Mirrors ``kanban_db.connect_closing``. """ conn = connect(db_path=db_path) try: yield conn finally: try: conn.close() except Exception: pass # TEXT columns added to `projects` after v1; re-applied idempotently on every # open so a legacy DB upgrades in place. _OPTIONAL_PROJECT_COLUMNS = ("board_slug", "primary_path", "icon", "color") def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: """Add columns introduced after v1 to legacy DBs (safe on every open).""" cols = {row["name"] for row in conn.execute("PRAGMA table_info(projects)")} for col in _OPTIONAL_PROJECT_COLUMNS: if col not in cols: _add_column_if_missing(conn, "projects", col, f"{col} TEXT") # --------------------------------------------------------------------------- # Dataclasses # --------------------------------------------------------------------------- @dataclass class ProjectFolder: path: str label: Optional[str] = None is_primary: bool = False added_at: int = 0 def to_dict(self) -> dict: return { "path": self.path, "label": self.label, "is_primary": bool(self.is_primary), "added_at": self.added_at, } @dataclass class Project: id: str slug: str name: str created_at: int description: Optional[str] = None icon: Optional[str] = None color: Optional[str] = None board_slug: Optional[str] = None primary_path: Optional[str] = None archived: bool = False folders: List[ProjectFolder] = field(default_factory=list) def to_dict(self) -> dict: return { "id": self.id, "slug": self.slug, "name": self.name, "description": self.description, "icon": self.icon, "color": self.color, "board_slug": self.board_slug, "primary_path": self.primary_path, "archived": bool(self.archived), "created_at": self.created_at, "folders": [f.to_dict() for f in self.folders], } def _project_from_row(row: sqlite3.Row) -> Project: keys = row.keys() return Project( id=row["id"], slug=row["slug"], name=row["name"], created_at=row["created_at"], description=row["description"] if "description" in keys else None, icon=row["icon"] if "icon" in keys else None, color=row["color"] if "color" in keys else None, board_slug=row["board_slug"] if "board_slug" in keys else None, primary_path=row["primary_path"] if "primary_path" in keys else None, archived=bool(row["archived"]) if "archived" in keys else False, ) def _load_folders(conn: sqlite3.Connection, project_id: str) -> List[ProjectFolder]: rows = conn.execute( "SELECT path, label, is_primary, added_at FROM project_folders " "WHERE project_id = ? ORDER BY is_primary DESC, added_at ASC", (project_id,), ).fetchall() return [ ProjectFolder( path=r["path"], label=r["label"], is_primary=bool(r["is_primary"]), added_at=r["added_at"], ) for r in rows ] def _attach_folders(conn: sqlite3.Connection, project: Project) -> Project: project.folders = _load_folders(conn, project.id) return project # --------------------------------------------------------------------------- # CRUD # --------------------------------------------------------------------------- def _unique_slug(conn: sqlite3.Connection, candidate: str) -> str: """Return ``candidate`` or ``candidate-2``, ``-3`` ... if taken.""" base = candidate n = 1 slug = base while conn.execute( "SELECT 1 FROM projects WHERE slug = ?", (slug,) ).fetchone() is not None: n += 1 suffix = f"-{n}" slug = (base[: 64 - len(suffix)]).rstrip("-_") + suffix return slug def create_project( conn: sqlite3.Connection, *, name: str, slug: Optional[str] = None, folders: Optional[Iterable[str]] = None, primary_path: Optional[str] = None, description: Optional[str] = None, icon: Optional[str] = None, color: Optional[str] = None, board_slug: Optional[str] = None, ) -> str: """Create a project and return its id. ``folders`` are normalized to absolute paths. If ``primary_path`` is given it is added to the folder set (if not already present) and marked primary; otherwise the first folder becomes primary. """ name = str(name or "").strip() if not name: raise ValueError("project name must not be empty") slug_candidate = normalize_slug(slug) if slug else _slugify(name) pid = _new_project_id() now = _now() folder_paths: List[str] = [] for f in folders or []: norm = _normalize_path(f) if norm and norm not in folder_paths: folder_paths.append(norm) primary = _normalize_path(primary_path) if primary_path else None if primary and primary not in folder_paths: folder_paths.insert(0, primary) if primary is None and folder_paths: primary = folder_paths[0] with write_txn(conn): unique = _unique_slug(conn, slug_candidate) conn.execute( "INSERT INTO projects " "(id, slug, name, description, icon, color, board_slug, " " primary_path, created_at, archived) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 0)", ( pid, unique, name, description, icon, color, normalize_slug(board_slug) if board_slug else None, primary, now, ), ) for path in folder_paths: conn.execute( "INSERT INTO project_folders " "(project_id, path, label, is_primary, added_at) " "VALUES (?, ?, ?, ?, ?)", (pid, path, None, 1 if path == primary else 0, now), ) return pid def list_projects( conn: sqlite3.Connection, *, include_archived: bool = False ) -> List[Project]: sql = "SELECT * FROM projects" if not include_archived: sql += " WHERE archived = 0" sql += " ORDER BY created_at ASC" rows = conn.execute(sql).fetchall() return [_attach_folders(conn, _project_from_row(r)) for r in rows] def get_project( conn: sqlite3.Connection, id_or_slug: str ) -> Optional[Project]: """Look up a project by id first, then by slug.""" row = conn.execute( "SELECT * FROM projects WHERE id = ?", (id_or_slug,) ).fetchone() if row is None: row = conn.execute( "SELECT * FROM projects WHERE slug = ?", (str(id_or_slug).lower(),) ).fetchone() if row is None: return None return _attach_folders(conn, _project_from_row(row)) def update_project( conn: sqlite3.Connection, project_id: str, *, name: Optional[str] = None, description: Optional[str] = None, icon: Optional[str] = None, color: Optional[str] = None, board_slug: Optional[str] = None, ) -> bool: """Patch top-level project fields. Only provided fields change. ``icon``, ``color``, and ``board_slug`` accept an empty string to clear (store NULL) — passing ``None`` leaves the field untouched, so callers that want to clear must send ``""``. """ sets: List[str] = [] params: List[object] = [] if name is not None: n = str(name).strip() if not n: raise ValueError("project name must not be empty") sets.append("name = ?") params.append(n) if description is not None: sets.append("description = ?") params.append(description) if icon is not None: sets.append("icon = ?") params.append(icon or None) if color is not None: sets.append("color = ?") params.append(color or None) if board_slug is not None: sets.append("board_slug = ?") params.append(normalize_slug(board_slug) if board_slug.strip() else None) if not sets: return False params.append(project_id) with write_txn(conn): cur = conn.execute( f"UPDATE projects SET {', '.join(sets)} WHERE id = ?", params ) return cur.rowcount > 0 def add_folder( conn: sqlite3.Connection, project_id: str, path: str, *, label: Optional[str] = None, is_primary: bool = False, ) -> str: """Add a folder to a project. Returns the normalized path. When ``is_primary`` is set, the folder becomes the project's primary repo (the previous primary is demoted, and ``projects.primary_path`` updates). """ norm = _normalize_path(path) if not norm: raise ValueError("folder path must not be empty") if get_project(conn, project_id) is None: raise ValueError(f"no such project: {project_id}") now = _now() with write_txn(conn): conn.execute( "INSERT OR IGNORE INTO project_folders " "(project_id, path, label, is_primary, added_at) " "VALUES (?, ?, ?, 0, ?)", (project_id, norm, label, now), ) if label is not None: conn.execute( "UPDATE project_folders SET label = ? " "WHERE project_id = ? AND path = ?", (label, project_id, norm), ) if is_primary: _set_primary_locked(conn, project_id, norm) else: # First folder of an empty project becomes primary implicitly. existing_primary = conn.execute( "SELECT 1 FROM project_folders " "WHERE project_id = ? AND is_primary = 1", (project_id,), ).fetchone() if existing_primary is None: _set_primary_locked(conn, project_id, norm) return norm def remove_folder(conn: sqlite3.Connection, project_id: str, path: str) -> bool: """Remove a folder from a project. Repoints primary if it was primary.""" norm = _normalize_path(path) with write_txn(conn): was_primary = conn.execute( "SELECT is_primary FROM project_folders " "WHERE project_id = ? AND path = ?", (project_id, norm), ).fetchone() cur = conn.execute( "DELETE FROM project_folders WHERE project_id = ? AND path = ?", (project_id, norm), ) if was_primary is not None and was_primary["is_primary"]: nxt = conn.execute( "SELECT path FROM project_folders WHERE project_id = ? " "ORDER BY added_at ASC LIMIT 1", (project_id,), ).fetchone() new_primary = nxt["path"] if nxt else None if new_primary: _set_primary_locked(conn, project_id, new_primary) else: conn.execute( "UPDATE projects SET primary_path = NULL WHERE id = ?", (project_id,), ) return cur.rowcount > 0 def _set_primary_locked( conn: sqlite3.Connection, project_id: str, path: str ) -> None: """Set the primary folder (caller already holds a write txn).""" conn.execute( "UPDATE project_folders SET is_primary = 0 WHERE project_id = ?", (project_id,), ) conn.execute( "UPDATE project_folders SET is_primary = 1 " "WHERE project_id = ? AND path = ?", (project_id, path), ) conn.execute( "UPDATE projects SET primary_path = ? WHERE id = ?", (path, project_id), ) def set_primary(conn: sqlite3.Connection, project_id: str, path: str) -> bool: norm = _normalize_path(path) with write_txn(conn): exists = conn.execute( "SELECT 1 FROM project_folders WHERE project_id = ? AND path = ?", (project_id, norm), ).fetchone() if exists is None: return False _set_primary_locked(conn, project_id, norm) return True def archive_project(conn: sqlite3.Connection, project_id: str) -> bool: with write_txn(conn): cur = conn.execute( "UPDATE projects SET archived = 1 WHERE id = ?", (project_id,) ) return cur.rowcount > 0 def restore_project(conn: sqlite3.Connection, project_id: str) -> bool: with write_txn(conn): cur = conn.execute( "UPDATE projects SET archived = 0 WHERE id = ?", (project_id,) ) return cur.rowcount > 0 def delete_project(conn: sqlite3.Connection, project_id: str) -> bool: """Hard-delete a project and its folders (cascade).""" with write_txn(conn): cur = conn.execute("DELETE FROM projects WHERE id = ?", (project_id,)) return cur.rowcount > 0 # --------------------------------------------------------------------------- # Active-project pointer (project_meta KV) # --------------------------------------------------------------------------- _ACTIVE_META_KEY = "active_id" def set_active(conn: sqlite3.Connection, project_id: Optional[str]) -> None: """Set (or clear, when ``None``) the active project pointer.""" with write_txn(conn): if project_id is None: conn.execute("DELETE FROM project_meta WHERE key = ?", (_ACTIVE_META_KEY,)) else: conn.execute( "INSERT INTO project_meta (key, value) VALUES (?, ?) " "ON CONFLICT(key) DO UPDATE SET value = excluded.value", (_ACTIVE_META_KEY, project_id), ) def get_active_id(conn: sqlite3.Connection) -> Optional[str]: row = conn.execute( "SELECT value FROM project_meta WHERE key = ?", (_ACTIVE_META_KEY,) ).fetchone() return row["value"] if row else None # --------------------------------------------------------------------------- # Discovered repos (filesystem scan cache) # --------------------------------------------------------------------------- def record_discovered_repos( conn: sqlite3.Connection, repos: Iterable[tuple[str, Optional[str]]], *, replace: bool = False, ) -> int: """Persist scanned git repo roots into the cache. ``repos`` is an iterable of ``(root, label)``. Roots are normalized; the label falls back to the basename. Returns the number of rows written. When ``replace`` is true, this is the authoritative result of a fresh disk scan: delete stale rows first so old eval/worktree noise disappears instead of living forever in the cache. """ now = _now() rows = [] for root, label in repos: norm = _normalize_path(root) if not norm: continue rows.append((norm, (label or os.path.basename(norm) or norm), now)) with write_txn(conn): if replace: conn.execute("DELETE FROM discovered_repos") if rows: conn.executemany( "INSERT INTO discovered_repos (root, label, last_seen) VALUES (?, ?, ?) " "ON CONFLICT(root) DO UPDATE SET label = excluded.label, " "last_seen = excluded.last_seen", rows, ) return len(rows) def list_discovered_repos(conn: sqlite3.Connection) -> List[dict]: """All cached discovered repo roots, most-recently-seen first.""" rows = conn.execute( "SELECT root, label, last_seen FROM discovered_repos ORDER BY last_seen DESC" ).fetchall() return [ {"root": r["root"], "label": r["label"], "last_seen": r["last_seen"]} for r in rows ] # --------------------------------------------------------------------------- # Resolution + naming # --------------------------------------------------------------------------- def project_for_path( conn: sqlite3.Connection, path: str, *, include_archived: bool = False ) -> Optional[Project]: """Return the project owning ``path`` (longest-prefix folder match). A folder owns ``path`` when ``path`` equals the folder or is nested under it. The most specific (longest) folder wins, so nested projects resolve to the innermost one. """ if not str(path or "").strip(): return None target = _normalize_path(path) sql = ( "SELECT pf.project_id AS pid, pf.path AS folder " "FROM project_folders pf JOIN projects p ON p.id = pf.project_id" ) if not include_archived: sql += " WHERE p.archived = 0" best_pid: Optional[str] = None best_len = -1 for row in conn.execute(sql).fetchall(): folder = row["folder"] if target == folder or target.startswith(folder.rstrip("/\\") + os.sep) or \ target.startswith(folder.rstrip("/\\") + "/"): if len(folder) > best_len: best_len = len(folder) best_pid = row["pid"] if best_pid is None: return None return get_project(conn, best_pid) # Deterministic branch slug: lowercase, separators collapsed, capped. _BRANCH_SAFE_RE = re.compile(r"[^a-z0-9._-]+") def branch_name_for(project: Project, task_id: str, *, title: str = "") -> str: """Deterministic branch name for a project-linked kanban task. Shape: ``/`` (optionally ``-``). Stable and human-meaningful, replacing the random ``wt/`` fallback. """ slug = project.slug or _slugify(project.name) base = f"{slug}/{task_id}" if title: tslug = _BRANCH_SAFE_RE.sub("-", str(title).strip().lower()).strip("-") tslug = tslug[:40].strip("-") if tslug: base = f"{base}-{tslug}" return base