diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 2a0c279962d..9dc3262e16f 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -5657,9 +5657,14 @@ async def list_cron_job_runs(job_id: str, profile: Optional[str] = None, limit: Cron runs are stored as ordinary sessions whose id is ``cron_{job_id}_{timestamp}`` (see cron/scheduler.run_job). A job's history is therefore every session whose id carries that prefix; ``source='cron'`` - narrows it and the id substring binds it to this job. Powers the run-history + narrows it and the id prefix binds it to this job. Powers the run-history list under each job in the desktop cron detail. Same row shape as ``/api/sessions`` so the frontend can reuse SessionInfo. + + Backed by ``SessionDB.list_cron_job_runs`` — a bounded ``[prefix, hi)`` + id-range scan, not the compression-chain CTE used for the recents list, + so the cost scales with the requested window and not the (unbounded) total + cron history. """ selected = profile or _find_cron_job_profile(job_id) # job_id may be a human name; resolve to the canonical id used in run-session ids. @@ -5676,13 +5681,7 @@ async def list_cron_job_runs(job_id: str, profile: Optional[str] = None, limit: db = _open_session_db_for_profile(selected) try: - runs = db.list_sessions_rich( - source="cron", - id_query=f"cron_{canonical}_", - limit=limit_n, - offset=0, - order_by_last_active=True, - ) + runs = db.list_cron_job_runs(canonical, limit=limit_n, offset=0) now = time.time() for s in runs: s["is_active"] = ( diff --git a/hermes_state.py b/hermes_state.py index 90247a02a0e..d83580beeec 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -33,7 +33,7 @@ T = TypeVar("T") DEFAULT_DB_PATH = get_hermes_home() / "state.db" -SCHEMA_VERSION = 14 +SCHEMA_VERSION = 15 # --------------------------------------------------------------------------- # WAL-compatibility fallback @@ -302,6 +302,7 @@ CREATE TABLE IF NOT EXISTS compression_locks ( ); CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source); +CREATE INDEX IF NOT EXISTS idx_sessions_source_id ON sessions(source, id); CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id); CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC); CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp); @@ -1845,6 +1846,72 @@ class SessionDB: return sessions + def list_cron_job_runs( + self, + job_id: str, + limit: int = 20, + offset: int = 0, + ) -> List[Dict[str, Any]]: + """List the run sessions produced by a single cron job, newest first. + + Cron runs are flat, independent sessions whose id is + ``cron_{job_id}_{timestamp}`` (see ``cron/scheduler.run_job``). They are + never compression roots and never branch, so this deliberately skips the + ``list_sessions_rich`` recursive compression-chain CTE / leading-wildcard + ``id_query`` path — that path seeds from *every* ``source='cron'`` row in + the DB and only filters to one job's runs after the scan, so it scales + with the whole cron pile (a heavy history makes the desktop run-history + endpoint time out before it eventually populates). + + Instead this binds to one job with a ``[prefix, prefix_hi)`` range over + the id (an index range scan, not a ``%...%`` substring), filters + ``source='cron'``, and orders by ``started_at DESC``. Work scales with + the requested window, not the total cron history. + + Returns the same enriched row shape as ``list_sessions_rich`` (adds + ``preview`` + ``last_active``) so callers can reuse it. + """ + prefix = f"cron_{job_id}_" + # Half-open upper bound for an index range scan: increment the final + # byte of the prefix so the range covers exactly the ids that start + # with ``prefix`` and nothing else. ``prefix`` always ends in '_', but + # compute it generically rather than hardcoding the successor char. + prefix_hi = prefix[:-1] + chr(ord(prefix[-1]) + 1) + + query = """ + SELECT s.*, + COALESCE( + (SELECT SUBSTR(REPLACE(REPLACE(m.content, X'0A', ' '), X'0D', ' '), 1, 63) + FROM messages m + WHERE m.session_id = s.id AND m.role = 'user' AND m.content IS NOT NULL + ORDER BY m.timestamp, m.id LIMIT 1), + '' + ) AS _preview_raw, + COALESCE( + (SELECT MAX(m2.timestamp) FROM messages m2 WHERE m2.session_id = s.id), + s.started_at + ) AS last_active + FROM sessions s + WHERE s.source = 'cron' AND s.id >= ? AND s.id < ? + ORDER BY s.started_at DESC, s.id DESC + LIMIT ? OFFSET ? + """ + with self._lock: + cursor = self._conn.execute(query, (prefix, prefix_hi, limit, offset)) + rows = cursor.fetchall() + + runs: List[Dict[str, Any]] = [] + for row in rows: + s = dict(row) + raw = s.pop("_preview_raw", "").strip() + if raw: + text = raw[:60] + s["preview"] = text + ("..." if len(raw) > 60 else "") + else: + s["preview"] = "" + runs.append(s) + return runs + def _get_session_rich_row(self, session_id: str) -> Optional[Dict[str, Any]]: """Fetch a single session with the same enriched columns as ``list_sessions_rich`` (preview + last_active). Returns None if the diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index 52eab1bd99a..04334317705 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -3903,3 +3903,106 @@ class TestSessionIdSearch: assert [s["id"] for s in matches] == [tip] assert matches[0]["_lineage_root_id"] == root + + +class TestListCronJobRuns: + """``list_cron_job_runs`` powers the desktop cron run-history endpoint. + + It must scope to exactly one job's runs via an id prefix range (not a + substring), order newest-first, enrich with preview/last_active, and stay + bounded by the requested window rather than the whole cron history. + """ + + def _seed_run(self, db, job_id: str, idx: int, started_at: float): + sid = f"cron_{job_id}_{idx:08d}" + db.create_session(session_id=sid, source="cron") + db.append_message(sid, role="user", content=f"run {idx} for {job_id}") + db.append_message(sid, role="assistant", content="done") + db.end_session(sid, "completed") + db._conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", (started_at, sid) + ) + db._conn.commit() + return sid + + def test_scopes_to_job_newest_first_and_enriched(self, db): + base = 1_700_000_000.0 + # Target job: 5 runs, ascending started_at. + for i in range(5): + self._seed_run(db, "alpha", i, base + i * 60) + # A different job that must not leak in. + for i in range(3): + self._seed_run(db, "beta", i, base + i * 60) + + runs = db.list_cron_job_runs("alpha", limit=20) + + assert len(runs) == 5 + assert all(r["id"].startswith("cron_alpha_") for r in runs) + # Newest started_at first. + sts = [r["started_at"] for r in runs] + assert sts == sorted(sts, reverse=True) + # Enriched like list_sessions_rich. + assert runs[0]["preview"].startswith("run 4 for alpha") + assert runs[0]["last_active"] >= runs[0]["started_at"] + + def test_prefix_match_excludes_substring_collision(self, db): + """A job whose id contains the target id as a substring must not leak. + + The old code used a leading-wildcard ``LIKE %cron__%`` which would + also match ``cron_xalpha_...``; the range scan binds to the true prefix. + """ + base = 1_700_000_000.0 + self._seed_run(db, "alpha", 0, base) + # Collision: id is "xalpha", which contains "alpha". + self._seed_run(db, "xalpha", 0, base + 10) + # Collision the other way: id "alpha2" extends past the underscore. + self._seed_run(db, "alpha2", 0, base + 20) + + runs = db.list_cron_job_runs("alpha", limit=20) + + assert [r["id"] for r in runs] == ["cron_alpha_00000000"] + + def test_ignores_non_cron_sessions(self, db): + base = 1_700_000_000.0 + self._seed_run(db, "alpha", 0, base) + # A non-cron session whose id happens to share the prefix shape. + db.create_session(session_id="cron_alpha_99999999", source="cli") + db._conn.execute( + "UPDATE sessions SET started_at = ? WHERE id = ?", + (base + 100, "cron_alpha_99999999"), + ) + db._conn.commit() + + runs = db.list_cron_job_runs("alpha", limit=20) + + assert [r["id"] for r in runs] == ["cron_alpha_00000000"] + + def test_limit_and_offset_paging(self, db): + base = 1_700_000_000.0 + for i in range(10): + self._seed_run(db, "alpha", i, base + i * 60) + + page1 = db.list_cron_job_runs("alpha", limit=4, offset=0) + page2 = db.list_cron_job_runs("alpha", limit=4, offset=4) + + assert len(page1) == 4 + assert len(page2) == 4 + assert {r["id"] for r in page1}.isdisjoint({r["id"] for r in page2}) + # Combined window is still newest-first and contiguous. + combined = [r["started_at"] for r in page1 + page2] + assert combined == sorted(combined, reverse=True) + + def test_uses_index_range_scan(self, db): + """The query must use the (source, id) index, not a full table scan.""" + prefix = "cron_alpha_" + prefix_hi = prefix[:-1] + chr(ord(prefix[-1]) + 1) + plan = db._conn.execute( + "EXPLAIN QUERY PLAN " + "SELECT s.* FROM sessions s " + "WHERE s.source = 'cron' AND s.id >= ? AND s.id < ? " + "ORDER BY s.started_at DESC LIMIT 20", + (prefix, prefix_hi), + ).fetchall() + detail = " ".join(row[-1] for row in plan) + assert "USING INDEX" in detail or "USING COVERING INDEX" in detail, detail + assert "idx_sessions_source" in detail, detail