From ed81cfe3def71601f7f575e6a2a6b8cd95db2be7 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 7 Jun 2026 02:41:01 -0700 Subject: [PATCH] fix(cron): bound the desktop run-history query to one job (#41088) The cron run-history endpoint (GET /api/cron/jobs/{id}/runs, added in #40684) reused list_sessions_rich's order_by_last_active path with a leading-wildcard id_query. That routes through the recursive compression-chain CTE, which seeds from EVERY source='cron' row in the DB and runs per-row preview/last_active subqueries before filtering to one job and applying LIMIT. Work scaled with the total cron history, so a large pile made the run-history load time out before eventually populating. Cron runs are flat, never-compressed sessions with ids of the form cron_{job_id}_{ts}, so the chain machinery is pure overhead and the job binding is a true prefix, not a substring. - New SessionDB.list_cron_job_runs(): bounded [prefix, hi) id-range scan on source='cron', ordered by started_at DESC, with the same preview/last_active enrichment. No CTE, no leading-wildcard LIKE. - Add idx_sessions_source(source, id) so the range is an index scan; bump SCHEMA_VERSION 14 -> 15 (index reconciles onto existing DBs via CREATE INDEX IF NOT EXISTS on startup). - Point the endpoint at the new method. Measured on a real SessionDB with 30k cron rows: 5ms vs 85ms for the old path (16x), and the new path stays flat as the pile grows while the old one scaled with it. Verified the query plan uses idx_sessions_source_id (range scan, no full table scan), runs are correctly scoped (substring collisions like cron_xalpha_ excluded), newest-first, and paged. --- hermes_cli/web_server.py | 15 +++--- hermes_state.py | 69 ++++++++++++++++++++++++- tests/test_hermes_state.py | 103 +++++++++++++++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 9 deletions(-) diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 2a0c279962d..9dc3262e16f 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -5657,9 +5657,14 @@ async def list_cron_job_runs(job_id: str, profile: Optional[str] = None, limit: Cron runs are stored as ordinary sessions whose id is ``cron_{job_id}_{timestamp}`` (see cron/scheduler.run_job). A job's history is therefore every session whose id carries that prefix; ``source='cron'`` - narrows it and the id substring binds it to this job. Powers the run-history + narrows it and the id prefix binds it to this job. Powers the run-history list under each job in the desktop cron detail. Same row shape as ``/api/sessions`` so the frontend can reuse SessionInfo. + + Backed by ``SessionDB.list_cron_job_runs`` — a bounded ``[prefix, hi)`` + id-range scan, not the compression-chain CTE used for the recents list, + so the cost scales with the requested window and not the (unbounded) total + cron history. """ selected = profile or _find_cron_job_profile(job_id) # job_id may be a human name; resolve to the canonical id used in run-session ids. @@ -5676,13 +5681,7 @@ async def list_cron_job_runs(job_id: str, profile: Optional[str] = None, limit: db = _open_session_db_for_profile(selected) try: - runs = db.list_sessions_rich( - source="cron", - id_query=f"cron_{canonical}_", - limit=limit_n, - offset=0, - order_by_last_active=True, - ) + runs = db.list_cron_job_runs(canonical, limit=limit_n, offset=0) now = time.time() for s in runs: s["is_active"] = ( diff --git a/hermes_state.py b/hermes_state.py index 90247a02a0e..d83580beeec 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -33,7 +33,7 @@ T = TypeVar("T") DEFAULT_DB_PATH = get_hermes_home() / "state.db" -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 # --------------------------------------------------------------------------- # WAL-compatibility fallback @@ -302,6 +302,7 @@ CREATE TABLE IF NOT EXISTS compression_locks ( ); CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source); +CREATE INDEX IF NOT EXISTS idx_sessions_source_id ON sessions(source, id); CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id); CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC); CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp); @@ -1845,6 +1846,72 @@ class SessionDB: return sessions + def list_cron_job_runs( + self, + job_id: str, + limit: int = 20, + offset: int = 0, + ) -> List[Dict[str, Any]]: + """List the run sessions produced by a single cron job, newest first. + + Cron runs are flat, independent sessions whose id is + ``cron_{job_id}_{timestamp}`` (see ``cron/scheduler.run_job``). They are + never compression roots and never branch, so this deliberately skips the + ``list_sessions_rich`` recursive compression-chain CTE / leading-wildcard + ``id_query`` path — that path seeds from *every* ``source='cron'`` row in + the DB and only filters to one job's runs after the scan, so it scales + with the whole cron pile (a heavy history makes the desktop run-history + endpoint time out before it eventually populates). + + Instead this binds to one job with a ``[prefix, prefix_hi)`` range over + the id (an index range scan, not a ``%...%`` substring), filters + ``source='cron'``, and orders by ``started_at DESC``. Work scales with + the requested window, not the total cron history. + + Returns the same enriched row shape as ``list_sessions_rich`` (adds + ``preview`` + ``last_active``) so callers can reuse it. + """ + prefix = f"cron_{job_id}_" + # Half-open upper bound for an index range scan: increment the final + # byte of the prefix so the range covers exactly the ids that start + # with ``prefix`` and nothing else. ``prefix`` always ends in '_', but + # compute it generically rather than hardcoding the successor char. + prefix_hi = prefix[:-1] + chr(ord(prefix[-1]) + 1) + + query = """ + SELECT s.*, + COALESCE( + (SELECT SUBSTR(REPLACE(REPLACE(m.content, X'0A', ' '), X'0D', ' '), 1, 63) + FROM messages m + WHERE m.session_id = s.id AND m.role = 'user' AND m.content IS NOT NULL + ORDER BY m.timestamp, m.id LIMIT 1), + '' + ) AS _preview_raw, + COALESCE( + (SELECT MAX(m2.timestamp) FROM messages m2 WHERE m2.session_id = s.id), + s.started_at + ) AS last_active + FROM sessions s + WHERE s.source = 'cron' AND s.id >= ? AND s.id < ? + ORDER BY s.started_at DESC, s.id DESC + LIMIT ? OFFSET ? + """ + with self._lock: + cursor = self._conn.execute(query, (prefix, prefix_hi, limit, offset)) + rows = cursor.fetchall() + + runs: List[Dict[str, Any]] = [] + for row in rows: + s = dict(row) + raw = s.pop("_preview_raw", "").strip() + if raw: + text = raw[:60] + s["preview"] = text + ("..." if len(raw) > 60 else "") + else: + s["preview"] = "" + runs.append(s) + return runs + def _get_session_rich_row(self, session_id: str) -> Optional[Dict[str, Any]]: """Fetch a single session with the same enriched columns as ``list_sessions_rich`` (preview + last_active). Returns None if the diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index 52eab1bd99a..04334317705 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -3903,3 +3903,106 @@ class TestSessionIdSearch: assert [s["id"] for s in matches] == [tip] assert matches[0]["_lineage_root_id"] == root + + +class TestListCronJobRuns: + """``list_cron_job_runs`` powers the desktop cron run-history endpoint. + + It must scope to exactly one job's runs via an id prefix range (not a + substring), order newest-first, enrich with preview/last_active, and stay + bounded by the requested window rather than the whole cron history. + """ + + def _seed_run(self, db, job_id: str, idx: int, started_at: float): + sid = f"cron_{job_id}_{idx:08d}" + db.create_session(session_id=sid, source="cron") + db.append_message(sid, role="user", content=f"run {idx} for {job_id}") + db.append_message(sid, role="assistant", content="done") + db.end_session(sid, "completed") + db._conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", (started_at, sid) + ) + db._conn.commit() + return sid + + def test_scopes_to_job_newest_first_and_enriched(self, db): + base = 1_700_000_000.0 + # Target job: 5 runs, ascending started_at. + for i in range(5): + self._seed_run(db, "alpha", i, base + i * 60) + # A different job that must not leak in. + for i in range(3): + self._seed_run(db, "beta", i, base + i * 60) + + runs = db.list_cron_job_runs("alpha", limit=20) + + assert len(runs) == 5 + assert all(r["id"].startswith("cron_alpha_") for r in runs) + # Newest started_at first. + sts = [r["started_at"] for r in runs] + assert sts == sorted(sts, reverse=True) + # Enriched like list_sessions_rich. + assert runs[0]["preview"].startswith("run 4 for alpha") + assert runs[0]["last_active"] >= runs[0]["started_at"] + + def test_prefix_match_excludes_substring_collision(self, db): + """A job whose id contains the target id as a substring must not leak. + + The old code used a leading-wildcard ``LIKE %cron__%`` which would + also match ``cron_xalpha_...``; the range scan binds to the true prefix. + """ + base = 1_700_000_000.0 + self._seed_run(db, "alpha", 0, base) + # Collision: id is "xalpha", which contains "alpha". + self._seed_run(db, "xalpha", 0, base + 10) + # Collision the other way: id "alpha2" extends past the underscore. + self._seed_run(db, "alpha2", 0, base + 20) + + runs = db.list_cron_job_runs("alpha", limit=20) + + assert [r["id"] for r in runs] == ["cron_alpha_00000000"] + + def test_ignores_non_cron_sessions(self, db): + base = 1_700_000_000.0 + self._seed_run(db, "alpha", 0, base) + # A non-cron session whose id happens to share the prefix shape. + db.create_session(session_id="cron_alpha_99999999", source="cli") + db._conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (base + 100, "cron_alpha_99999999"), + ) + db._conn.commit() + + runs = db.list_cron_job_runs("alpha", limit=20) + + assert [r["id"] for r in runs] == ["cron_alpha_00000000"] + + def test_limit_and_offset_paging(self, db): + base = 1_700_000_000.0 + for i in range(10): + self._seed_run(db, "alpha", i, base + i * 60) + + page1 = db.list_cron_job_runs("alpha", limit=4, offset=0) + page2 = db.list_cron_job_runs("alpha", limit=4, offset=4) + + assert len(page1) == 4 + assert len(page2) == 4 + assert {r["id"] for r in page1}.isdisjoint({r["id"] for r in page2}) + # Combined window is still newest-first and contiguous. + combined = [r["started_at"] for r in page1 + page2] + assert combined == sorted(combined, reverse=True) + + def test_uses_index_range_scan(self, db): + """The query must use the (source, id) index, not a full table scan.""" + prefix = "cron_alpha_" + prefix_hi = prefix[:-1] + chr(ord(prefix[-1]) + 1) + plan = db._conn.execute( + "EXPLAIN QUERY PLAN " + "SELECT s.* FROM sessions s " + "WHERE s.source = 'cron' AND s.id >= ? AND s.id < ? " + "ORDER BY s.started_at DESC LIMIT 20", + (prefix, prefix_hi), + ).fetchall() + detail = " ".join(row[-1] for row in plan) + assert "USING INDEX" in detail or "USING COVERING INDEX" in detail, detail + assert "idx_sessions_source" in detail, detail