feat(kanban): archive --rm to hard-delete archived tasks

Salvages #19964 by @Beandon13. Adds `hermes kanban archive --rm` to
permanently remove already-archived tasks with cascading cleanup of
links, comments, events, runs, and notify-subs. Safety guard: only
archived tasks can be deleted; active/blocked/done must be archived
first.

Cherry-picked from #19964 onto current main (severe stale base, applied
manually to preserve substance only).
This commit is contained in:
Beandon13 2026-05-18 20:09:13 -07:00 committed by Teknium
parent 06161c6ed8
commit bde6313e34
4 changed files with 110 additions and 2 deletions

View file

@ -435,7 +435,15 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
p_unblock.add_argument("task_ids", nargs="+")
p_archive = sub.add_parser("archive", help="Archive one or more tasks")
p_archive.add_argument("task_ids", nargs="+")
p_archive.add_argument("task_ids", nargs="*",
help="Task ids to archive (default mode)")
p_archive.add_argument(
"--rm",
dest="purge_ids",
nargs="+",
default=None,
help="Permanently delete already-archived task ids from the board",
)
# --- tail ---
p_tail = sub.add_parser("tail", help="Follow a task's event stream")
@ -1692,11 +1700,23 @@ def _cmd_unblock(args: argparse.Namespace) -> int:
def _cmd_archive(args: argparse.Namespace) -> int:
ids = list(args.task_ids or [])
if not ids:
purge_ids = list(getattr(args, "purge_ids", None) or [])
if ids and purge_ids:
print("choose either task_ids to archive or --rm archived task_ids", file=sys.stderr)
return 1
if not ids and not purge_ids:
print("at least one task_id is required", file=sys.stderr)
return 1
failed: list[str] = []
with kb.connect() as conn:
if purge_ids:
for tid in purge_ids:
if not kb.delete_archived_task(conn, tid):
failed.append(tid)
print(f"cannot delete {tid} (must already be archived)", file=sys.stderr)
else:
print(f"Deleted {tid}")
return 0 if not failed else 1
for tid in ids:
if not kb.archive_task(conn, tid):
failed.append(tid)

View file

@ -3014,6 +3014,32 @@ def archive_task(conn: sqlite3.Connection, task_id: str) -> bool:
return True
def delete_archived_task(conn: sqlite3.Connection, task_id: str) -> bool:
"""Permanently remove an already-archived task and its related rows.
Safety guard: only archived tasks can be deleted. Active / blocked / done
tasks must be explicitly archived first so accidental data loss requires a
second deliberate action.
"""
with write_txn(conn):
row = conn.execute(
"SELECT status FROM tasks WHERE id = ?",
(task_id,),
).fetchone()
if not row or row["status"] != "archived":
return False
conn.execute(
"DELETE FROM task_links WHERE parent_id = ? OR child_id = ?",
(task_id, task_id),
)
conn.execute("DELETE FROM task_comments WHERE task_id = ?", (task_id,))
conn.execute("DELETE FROM task_events WHERE task_id = ?", (task_id,))
conn.execute("DELETE FROM task_runs WHERE task_id = ?", (task_id,))
conn.execute("DELETE FROM kanban_notify_subs WHERE task_id = ?", (task_id,))
cur = conn.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
return cur.rowcount == 1
# ---------------------------------------------------------------------------
# Workspace resolution
# ---------------------------------------------------------------------------