diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 1e7169c26cf..f683f69edee 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -1021,7 +1021,7 @@ def _board_task_counts(slug: str) -> dict[str, int]: path = kb.kanban_db_path(board=slug) if not path.exists(): return {} - with kb.connect(board=slug) as conn: + with kb.connect_closing(board=slug) as conn: rows = conn.execute( "SELECT status, COUNT(*) AS n FROM tasks GROUP BY status" ).fetchall() @@ -1264,7 +1264,7 @@ def _cmd_init(args: argparse.Namespace) -> int: def _cmd_heartbeat(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.heartbeat_worker( conn, args.task_id, @@ -1279,7 +1279,7 @@ def _cmd_heartbeat(args: argparse.Namespace) -> int: def _cmd_assignees(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: data = kb.known_assignees(conn) if getattr(args, "json", False): print(json.dumps(data, indent=2, ensure_ascii=False)) @@ -1320,7 +1320,7 @@ def _cmd_create(args: argparse.Namespace) -> int: file=sys.stderr, ) return 2 - with kb.connect() as conn: + with kb.connect_closing() as conn: task_id = kb.create_task( conn, title=args.title, @@ -1369,7 +1369,7 @@ def _cmd_swarm(args: argparse.Namespace) -> int: if not workers: print("kanban swarm: at least one --worker is required", file=sys.stderr) return 2 - with kb.connect() as conn: + with kb.connect_closing() as conn: created = ks.create_swarm( conn, goal=args.goal, @@ -1395,7 +1395,7 @@ def _cmd_list(args: argparse.Namespace) -> int: assignee = args.assignee if args.mine and not assignee: assignee = _profile_author() - with kb.connect() as conn: + with kb.connect_closing() as conn: # Cheap "mini-dispatch": recompute ready so list output reflects # dependencies that may have cleared since the last dispatcher tick. kb.recompute_ready(conn) @@ -1444,7 +1444,7 @@ def _cmd_show(args: argparse.Namespace) -> int: file=sys.stderr, ) return 2 - with kb.connect() as conn: + with kb.connect_closing() as conn: task = kb.get_task(conn, args.task_id) if not task: print(f"no such task: {args.task_id}", file=sys.stderr) @@ -1610,7 +1610,7 @@ def _cmd_show(args: argparse.Namespace) -> int: def _cmd_assign(args: argparse.Namespace) -> int: profile = None if args.profile.lower() in {"none", "-", "null"} else args.profile - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.assign_task(conn, args.task_id, profile) if not ok: print(f"no such task: {args.task_id}", file=sys.stderr) @@ -1620,7 +1620,7 @@ def _cmd_assign(args: argparse.Namespace) -> int: def _cmd_reclaim(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.reclaim_task( conn, args.task_id, reason=getattr(args, "reason", None), @@ -1637,7 +1637,7 @@ def _cmd_reclaim(args: argparse.Namespace) -> int: def _cmd_reassign(args: argparse.Namespace) -> int: profile = None if args.profile.lower() in {"none", "-", "null"} else args.profile - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.reassign_task( conn, args.task_id, profile, reclaim_first=bool(getattr(args, "reclaim", False)), @@ -1667,7 +1667,7 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int: diag_config = kd.config_from_runtime_config(load_config()) - with kb.connect() as conn: + with kb.connect_closing() as conn: # Either one-task mode or fleet mode. if getattr(args, "task", None): task = kb.get_task(conn, args.task) @@ -1790,14 +1790,14 @@ def _cmd_diagnostics(args: argparse.Namespace) -> int: def _cmd_link(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: kb.link_tasks(conn, args.parent_id, args.child_id) print(f"Linked {args.parent_id} -> {args.child_id}") return 0 def _cmd_unlink(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.unlink_tasks(conn, args.parent_id, args.child_id) if not ok: print(f"No such link: {args.parent_id} -> {args.child_id}", file=sys.stderr) @@ -1807,7 +1807,7 @@ def _cmd_unlink(args: argparse.Namespace) -> int: def _cmd_claim(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: task = kb.claim_task(conn, args.task_id, ttl_seconds=args.ttl) if task is None: # Report why @@ -1838,7 +1838,7 @@ def _cmd_comment(args: argparse.Namespace) -> int: suffix = f"\n\n[trimmed to {args.max_len} chars by --max-len]" body = body[: max(0, args.max_len - len(suffix))].rstrip() + suffix author = args.author or _profile_author() - with kb.connect() as conn: + with kb.connect_closing() as conn: kb.add_comment(conn, args.task_id, author, body) print(f"Comment added to {args.task_id}") return 0 @@ -1885,7 +1885,7 @@ def _cmd_complete(args: argparse.Namespace) -> int: print(f"kanban: --metadata: {exc}", file=sys.stderr) return 2 failed: list[str] = [] - with kb.connect() as conn: + with kb.connect_closing() as conn: for tid in ids: if not kb.complete_task( conn, tid, @@ -1912,7 +1912,7 @@ def _cmd_edit(args: argparse.Namespace) -> int: except (ValueError, json.JSONDecodeError) as exc: print(f"kanban: --metadata: {exc}", file=sys.stderr) return 2 - with kb.connect() as conn: + with kb.connect_closing() as conn: if not kb.edit_completed_task_result( conn, args.task_id, @@ -1934,7 +1934,7 @@ def _cmd_block(args: argparse.Namespace) -> int: author = _profile_author() ids = [args.task_id] + list(getattr(args, "ids", None) or []) failed: list[str] = [] - with kb.connect() as conn: + with kb.connect_closing() as conn: for tid in ids: if reason: kb.add_comment(conn, tid, author, f"BLOCKED: {reason}") @@ -1956,7 +1956,7 @@ def _cmd_schedule(args: argparse.Namespace) -> int: author = _profile_author() ids = [args.task_id] + list(getattr(args, "ids", None) or []) failed: list[str] = [] - with kb.connect() as conn: + with kb.connect_closing() as conn: for tid in ids: if reason: kb.add_comment(conn, tid, author, f"SCHEDULED: {reason}") @@ -1979,7 +1979,7 @@ def _cmd_unblock(args: argparse.Namespace) -> int: print("at least one task_id is required", file=sys.stderr) return 1 failed: list[str] = [] - with kb.connect() as conn: + with kb.connect_closing() as conn: for tid in ids: if not kb.unblock_task(conn, tid): failed.append(tid) @@ -2003,7 +2003,7 @@ def _cmd_promote(args: argparse.Namespace) -> int: seen.add(tid) results: list[dict[str, object]] = [] - with kb.connect() as conn: + with kb.connect_closing() as conn: for tid in ids: ok, err = kb.promote_task( conn, @@ -2050,7 +2050,7 @@ def _cmd_archive(args: argparse.Namespace) -> int: print("at least one task_id is required", file=sys.stderr) return 1 failed: list[str] = [] - with kb.connect() as conn: + with kb.connect_closing() as conn: if purge_ids: for tid in purge_ids: if not kb.delete_archived_task(conn, tid): @@ -2073,7 +2073,7 @@ def _cmd_tail(args: argparse.Namespace) -> int: print(f"Tailing events for {args.task_id}. Ctrl-C to stop.") try: while True: - with kb.connect() as conn: + with kb.connect_closing() as conn: events = kb.list_events(conn, args.task_id) for e in events: if e.id > last_id: @@ -2087,7 +2087,7 @@ def _cmd_tail(args: argparse.Namespace) -> int: def _cmd_dispatch(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: res = kb.dispatch_once( conn, dry_run=args.dry_run, @@ -2257,7 +2257,7 @@ def _cmd_daemon(args: argparse.Namespace) -> int: from the dispatcher's perspective, not stuck. """ try: - with kb.connect() as conn: + with kb.connect_closing() as conn: return kb.has_spawnable_ready(conn) except Exception: return False @@ -2288,7 +2288,7 @@ def _cmd_watch(args: argparse.Namespace) -> int: cursor = 0 print("Watching kanban events. Ctrl-C to stop.", flush=True) # Seed cursor at the latest id so we don't replay history. - with kb.connect() as conn: + with kb.connect_closing() as conn: row = conn.execute( "SELECT COALESCE(MAX(id), 0) AS m FROM task_events" ).fetchone() @@ -2296,7 +2296,7 @@ def _cmd_watch(args: argparse.Namespace) -> int: try: while True: - with kb.connect() as conn: + with kb.connect_closing() as conn: rows = conn.execute( "SELECT e.id, e.task_id, e.kind, e.payload, e.created_at, " " t.assignee, t.tenant " @@ -2329,7 +2329,7 @@ def _cmd_watch(args: argparse.Namespace) -> int: def _cmd_stats(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: stats = kb.board_stats(conn) if getattr(args, "json", False): print(json.dumps(stats, indent=2, ensure_ascii=False)) @@ -2349,7 +2349,7 @@ def _cmd_stats(args: argparse.Namespace) -> int: def _cmd_notify_subscribe(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: if kb.get_task(conn, args.task_id) is None: print(f"no such task: {args.task_id}", file=sys.stderr) return 1 @@ -2366,7 +2366,7 @@ def _cmd_notify_subscribe(args: argparse.Namespace) -> int: def _cmd_notify_list(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: subs = kb.list_notify_subs(conn, args.task_id) if getattr(args, "json", False): print(json.dumps(subs, indent=2, ensure_ascii=False)) @@ -2383,7 +2383,7 @@ def _cmd_notify_list(args: argparse.Namespace) -> int: def _cmd_notify_unsubscribe(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.remove_notify_sub( conn, task_id=args.task_id, platform=args.platform, chat_id=args.chat_id, @@ -2417,7 +2417,7 @@ def _cmd_runs(args: argparse.Namespace) -> int: file=sys.stderr, ) return 2 - with kb.connect() as conn: + with kb.connect_closing() as conn: runs = kb.list_runs(conn, args.task_id, **rsk) if getattr(args, "json", False): print(json.dumps([ @@ -2456,7 +2456,7 @@ def _cmd_runs(args: argparse.Namespace) -> int: def _cmd_context(args: argparse.Namespace) -> int: - with kb.connect() as conn: + with kb.connect_closing() as conn: text = kb.build_worker_context(conn, args.task_id) print(text) return 0 @@ -2622,7 +2622,7 @@ def _cmd_gc(args: argparse.Namespace) -> int: import shutil scratch_root = kb.workspaces_root() removed_ws = 0 - with kb.connect() as conn: + with kb.connect_closing() as conn: rows = conn.execute( "SELECT id, workspace_kind, workspace_path FROM tasks WHERE status = 'archived'" ).fetchall() @@ -2645,7 +2645,7 @@ def _cmd_gc(args: argparse.Namespace) -> int: event_days = getattr(args, "event_retention_days", 30) log_days = getattr(args, "log_retention_days", 30) - with kb.connect() as conn: + with kb.connect_closing() as conn: removed_events = kb.gc_events( conn, older_than_seconds=event_days * 24 * 3600, ) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 55a981dbef3..aa28b07e2db 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1236,6 +1236,41 @@ def connect( return conn +@contextlib.contextmanager +def connect_closing( + db_path: Optional[Path] = None, + *, + board: Optional[str] = None, +): + """Open a kanban DB connection and guarantee it is closed on exit. + + Use this instead of ``with kb.connect() as conn:`` — sqlite3's + built-in connection context manager only commits/rollbacks the + transaction; it does NOT close the file descriptor. In long-lived + processes (gateway, dashboard) that route every kanban operation + through ``connect()`` (e.g. ``run_slash`` dispatching ``/kanban …`` + commands, ``decompose_task_endpoint`` calling + ``kanban_decompose.decompose_task``), the unclosed connections + accumulate as open FDs to ``kanban.db`` and ``kanban.db-wal``. After + enough operations the process hits the kernel FD limit and dies + with ``[Errno 24] Too many open files``. + + See #33159 for the production incident. + + The ``connect()`` function itself remains unchanged so callers that + intentionally manage the connection lifetime (tests, long-lived + callers) continue to work. + """ + conn = connect(db_path=db_path, board=board) + try: + yield conn + finally: + try: + conn.close() + except Exception: + pass + + def init_db( db_path: Optional[Path] = None, *, diff --git a/hermes_cli/kanban_decompose.py b/hermes_cli/kanban_decompose.py index 063abcf7b51..dec7c0b7c72 100644 --- a/hermes_cli/kanban_decompose.py +++ b/hermes_cli/kanban_decompose.py @@ -281,7 +281,7 @@ def decompose_task( configured, API error, malformed response, decomposer returned fanout=true with empty task list) — those surface via ``ok=False``. """ - with kb.connect() as conn: + with kb.connect_closing() as conn: task = kb.get_task(conn, task_id) if task is None: return DecomposeOutcome(task_id, False, "unknown task id") @@ -370,7 +370,7 @@ def decompose_task( return DecomposeOutcome( task_id, False, "decomposer returned fanout=false with no title/body", ) - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.specify_triage_task( conn, task_id, @@ -439,7 +439,7 @@ def decompose_task( }) try: - with kb.connect() as conn: + with kb.connect_closing() as conn: child_ids = kb.decompose_triage_task( conn, task_id, @@ -467,7 +467,7 @@ def decompose_task( def list_triage_ids(*, tenant: Optional[str] = None) -> list[str]: """Return task ids currently in the triage column.""" - with kb.connect() as conn: + with kb.connect_closing() as conn: rows = kb.list_tasks( conn, status="triage", diff --git a/hermes_cli/kanban_specify.py b/hermes_cli/kanban_specify.py index 1ad576bf8f1..4bfcce61ee9 100644 --- a/hermes_cli/kanban_specify.py +++ b/hermes_cli/kanban_specify.py @@ -150,7 +150,7 @@ def specify_task( error, malformed response) — those surface via ``ok=False`` so the ``--all`` sweep can continue past individual failures. """ - with kb.connect() as conn: + with kb.connect_closing() as conn: task = kb.get_task(conn, task_id) if task is None: return SpecifyOutcome(task_id, False, "unknown task id") @@ -239,7 +239,7 @@ def specify_task( task_id, False, "LLM response missing title and body" ) - with kb.connect() as conn: + with kb.connect_closing() as conn: ok = kb.specify_triage_task( conn, task_id, @@ -261,7 +261,7 @@ def list_triage_ids(*, tenant: Optional[str] = None) -> list[str]: ``tenant`` narrows the sweep; ``None`` returns every triage task. """ - with kb.connect() as conn: + with kb.connect_closing() as conn: tasks = kb.list_tasks( conn, status="triage", diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index 30cb8421a20..b9e9a7ee9c1 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -3805,3 +3805,66 @@ def test_dispatch_once_still_reaps_via_extracted_fn(kanban_home): pids = kb.reap_worker_zombies() assert pids == [99999] + + + +# --------------------------------------------------------------------------- +# connect_closing(): context manager that actually closes the FD +# Regression coverage for #33159 (kanban.db FD leak — gateway crashes after +# ~4 days). sqlite3.Connection's built-in __exit__ commits/rollbacks but +# does NOT close, so `with kb.connect() as conn:` leaks the FD in +# long-lived processes (gateway run_slash, dashboard decompose handler). +# `connect_closing()` is the leak-safe replacement. +# --------------------------------------------------------------------------- + + +def test_connect_closing_closes_connection_on_exit(tmp_path): + """The new context manager MUST actually close the underlying FD.""" + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect_closing(db_path=db_path) as conn: + conn.execute("SELECT 1").fetchone() + # After exit, the connection MUST be closed — subsequent execute + # should raise ProgrammingError. + with pytest.raises(sqlite3.ProgrammingError): + conn.execute("SELECT 1") + + +def test_connect_closing_closes_on_exception(tmp_path): + """Connection closed even when the body raises.""" + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + captured = [] + with pytest.raises(RuntimeError, match="boom"): + with kb.connect_closing(db_path=db_path) as conn: + captured.append(conn) + raise RuntimeError("boom") + with pytest.raises(sqlite3.ProgrammingError): + captured[0].execute("SELECT 1") + + +def test_connect_closing_yields_usable_connection(tmp_path): + """Smoke test: schema is initialized and basic ops work.""" + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect_closing(db_path=db_path) as conn: + tid = kb.create_task(conn, title="closing-cm test") + task = kb.get_task(conn, tid) + assert task is not None + assert task.title == "closing-cm test" + + +def test_bare_connect_does_not_close_on_context_exit(tmp_path): + """Document the leak that connect_closing exists to prevent. + + sqlite3.Connection's __exit__ commits/rollbacks but doesn't close. + This is the upstream behaviour we cannot change; the regression + guard is to make sure connect_closing() does the right thing. + """ + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect(db_path=db_path) as conn: + pass + # Still usable after with-block exit (the leak). + conn.execute("SELECT 1").fetchone() + conn.close() # explicit close to avoid leaking THIS test