mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
fix(cron): bound the desktop run-history query to one job (#41088)
The cron run-history endpoint (GET /api/cron/jobs/{id}/runs, added in
#40684) reused list_sessions_rich's order_by_last_active path with a
leading-wildcard id_query. That routes through the recursive
compression-chain CTE, which seeds from EVERY source='cron' row in the DB
and runs per-row preview/last_active subqueries before filtering to one
job and applying LIMIT. Work scaled with the total cron history, so a
large pile made the run-history load time out before eventually
populating.
Cron runs are flat, never-compressed sessions with ids of the form
cron_{job_id}_{ts}, so the chain machinery is pure overhead and the
job binding is a true prefix, not a substring.
- New SessionDB.list_cron_job_runs(): bounded [prefix, hi) id-range scan
on source='cron', ordered by started_at DESC, with the same
preview/last_active enrichment. No CTE, no leading-wildcard LIKE.
- Add idx_sessions_source(source, id) so the range is an index scan;
bump SCHEMA_VERSION 14 -> 15 (index reconciles onto existing DBs via
CREATE INDEX IF NOT EXISTS on startup).
- Point the endpoint at the new method.
Measured on a real SessionDB with 30k cron rows: 5ms vs 85ms for the old
path (16x), and the new path stays flat as the pile grows while the old
one scaled with it. Verified the query plan uses idx_sessions_source_id
(range scan, no full table scan), runs are correctly scoped (substring
collisions like cron_xalpha_ excluded), newest-first, and paged.
This commit is contained in:
parent
5a3092b601
commit
ed81cfe3de
3 changed files with 178 additions and 9 deletions
|
|
@ -5657,9 +5657,14 @@ async def list_cron_job_runs(job_id: str, profile: Optional[str] = None, limit:
|
|||
Cron runs are stored as ordinary sessions whose id is
|
||||
``cron_{job_id}_{timestamp}`` (see cron/scheduler.run_job). A job's history
|
||||
is therefore every session whose id carries that prefix; ``source='cron'``
|
||||
narrows it and the id substring binds it to this job. Powers the run-history
|
||||
narrows it and the id prefix binds it to this job. Powers the run-history
|
||||
list under each job in the desktop cron detail. Same row shape as
|
||||
``/api/sessions`` so the frontend can reuse SessionInfo.
|
||||
|
||||
Backed by ``SessionDB.list_cron_job_runs`` — a bounded ``[prefix, hi)``
|
||||
id-range scan, not the compression-chain CTE used for the recents list,
|
||||
so the cost scales with the requested window and not the (unbounded) total
|
||||
cron history.
|
||||
"""
|
||||
selected = profile or _find_cron_job_profile(job_id)
|
||||
# job_id may be a human name; resolve to the canonical id used in run-session ids.
|
||||
|
|
@ -5676,13 +5681,7 @@ async def list_cron_job_runs(job_id: str, profile: Optional[str] = None, limit:
|
|||
|
||||
db = _open_session_db_for_profile(selected)
|
||||
try:
|
||||
runs = db.list_sessions_rich(
|
||||
source="cron",
|
||||
id_query=f"cron_{canonical}_",
|
||||
limit=limit_n,
|
||||
offset=0,
|
||||
order_by_last_active=True,
|
||||
)
|
||||
runs = db.list_cron_job_runs(canonical, limit=limit_n, offset=0)
|
||||
now = time.time()
|
||||
for s in runs:
|
||||
s["is_active"] = (
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ T = TypeVar("T")
|
|||
|
||||
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||
|
||||
SCHEMA_VERSION = 14
|
||||
SCHEMA_VERSION = 15
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WAL-compatibility fallback
|
||||
|
|
@ -302,6 +302,7 @@ CREATE TABLE IF NOT EXISTS compression_locks (
|
|||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_source_id ON sessions(source, id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
|
||||
|
|
@ -1845,6 +1846,72 @@ class SessionDB:
|
|||
|
||||
return sessions
|
||||
|
||||
def list_cron_job_runs(
|
||||
self,
|
||||
job_id: str,
|
||||
limit: int = 20,
|
||||
offset: int = 0,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List the run sessions produced by a single cron job, newest first.
|
||||
|
||||
Cron runs are flat, independent sessions whose id is
|
||||
``cron_{job_id}_{timestamp}`` (see ``cron/scheduler.run_job``). They are
|
||||
never compression roots and never branch, so this deliberately skips the
|
||||
``list_sessions_rich`` recursive compression-chain CTE / leading-wildcard
|
||||
``id_query`` path — that path seeds from *every* ``source='cron'`` row in
|
||||
the DB and only filters to one job's runs after the scan, so it scales
|
||||
with the whole cron pile (a heavy history makes the desktop run-history
|
||||
endpoint time out before it eventually populates).
|
||||
|
||||
Instead this binds to one job with a ``[prefix, prefix_hi)`` range over
|
||||
the id (an index range scan, not a ``%...%`` substring), filters
|
||||
``source='cron'``, and orders by ``started_at DESC``. Work scales with
|
||||
the requested window, not the total cron history.
|
||||
|
||||
Returns the same enriched row shape as ``list_sessions_rich`` (adds
|
||||
``preview`` + ``last_active``) so callers can reuse it.
|
||||
"""
|
||||
prefix = f"cron_{job_id}_"
|
||||
# Half-open upper bound for an index range scan: increment the final
|
||||
# byte of the prefix so the range covers exactly the ids that start
|
||||
# with ``prefix`` and nothing else. ``prefix`` always ends in '_', but
|
||||
# compute it generically rather than hardcoding the successor char.
|
||||
prefix_hi = prefix[:-1] + chr(ord(prefix[-1]) + 1)
|
||||
|
||||
query = """
|
||||
SELECT s.*,
|
||||
COALESCE(
|
||||
(SELECT SUBSTR(REPLACE(REPLACE(m.content, X'0A', ' '), X'0D', ' '), 1, 63)
|
||||
FROM messages m
|
||||
WHERE m.session_id = s.id AND m.role = 'user' AND m.content IS NOT NULL
|
||||
ORDER BY m.timestamp, m.id LIMIT 1),
|
||||
''
|
||||
) AS _preview_raw,
|
||||
COALESCE(
|
||||
(SELECT MAX(m2.timestamp) FROM messages m2 WHERE m2.session_id = s.id),
|
||||
s.started_at
|
||||
) AS last_active
|
||||
FROM sessions s
|
||||
WHERE s.source = 'cron' AND s.id >= ? AND s.id < ?
|
||||
ORDER BY s.started_at DESC, s.id DESC
|
||||
LIMIT ? OFFSET ?
|
||||
"""
|
||||
with self._lock:
|
||||
cursor = self._conn.execute(query, (prefix, prefix_hi, limit, offset))
|
||||
rows = cursor.fetchall()
|
||||
|
||||
runs: List[Dict[str, Any]] = []
|
||||
for row in rows:
|
||||
s = dict(row)
|
||||
raw = s.pop("_preview_raw", "").strip()
|
||||
if raw:
|
||||
text = raw[:60]
|
||||
s["preview"] = text + ("..." if len(raw) > 60 else "")
|
||||
else:
|
||||
s["preview"] = ""
|
||||
runs.append(s)
|
||||
return runs
|
||||
|
||||
def _get_session_rich_row(self, session_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Fetch a single session with the same enriched columns as
|
||||
``list_sessions_rich`` (preview + last_active). Returns None if the
|
||||
|
|
|
|||
|
|
@ -3903,3 +3903,106 @@ class TestSessionIdSearch:
|
|||
|
||||
assert [s["id"] for s in matches] == [tip]
|
||||
assert matches[0]["_lineage_root_id"] == root
|
||||
|
||||
|
||||
class TestListCronJobRuns:
|
||||
"""``list_cron_job_runs`` powers the desktop cron run-history endpoint.
|
||||
|
||||
It must scope to exactly one job's runs via an id prefix range (not a
|
||||
substring), order newest-first, enrich with preview/last_active, and stay
|
||||
bounded by the requested window rather than the whole cron history.
|
||||
"""
|
||||
|
||||
def _seed_run(self, db, job_id: str, idx: int, started_at: float):
|
||||
sid = f"cron_{job_id}_{idx:08d}"
|
||||
db.create_session(session_id=sid, source="cron")
|
||||
db.append_message(sid, role="user", content=f"run {idx} for {job_id}")
|
||||
db.append_message(sid, role="assistant", content="done")
|
||||
db.end_session(sid, "completed")
|
||||
db._conn.execute(
|
||||
"UPDATE sessions SET started_at = ? WHERE id = ?", (started_at, sid)
|
||||
)
|
||||
db._conn.commit()
|
||||
return sid
|
||||
|
||||
def test_scopes_to_job_newest_first_and_enriched(self, db):
|
||||
base = 1_700_000_000.0
|
||||
# Target job: 5 runs, ascending started_at.
|
||||
for i in range(5):
|
||||
self._seed_run(db, "alpha", i, base + i * 60)
|
||||
# A different job that must not leak in.
|
||||
for i in range(3):
|
||||
self._seed_run(db, "beta", i, base + i * 60)
|
||||
|
||||
runs = db.list_cron_job_runs("alpha", limit=20)
|
||||
|
||||
assert len(runs) == 5
|
||||
assert all(r["id"].startswith("cron_alpha_") for r in runs)
|
||||
# Newest started_at first.
|
||||
sts = [r["started_at"] for r in runs]
|
||||
assert sts == sorted(sts, reverse=True)
|
||||
# Enriched like list_sessions_rich.
|
||||
assert runs[0]["preview"].startswith("run 4 for alpha")
|
||||
assert runs[0]["last_active"] >= runs[0]["started_at"]
|
||||
|
||||
def test_prefix_match_excludes_substring_collision(self, db):
|
||||
"""A job whose id contains the target id as a substring must not leak.
|
||||
|
||||
The old code used a leading-wildcard ``LIKE %cron_<id>_%`` which would
|
||||
also match ``cron_xalpha_...``; the range scan binds to the true prefix.
|
||||
"""
|
||||
base = 1_700_000_000.0
|
||||
self._seed_run(db, "alpha", 0, base)
|
||||
# Collision: id is "xalpha", which contains "alpha".
|
||||
self._seed_run(db, "xalpha", 0, base + 10)
|
||||
# Collision the other way: id "alpha2" extends past the underscore.
|
||||
self._seed_run(db, "alpha2", 0, base + 20)
|
||||
|
||||
runs = db.list_cron_job_runs("alpha", limit=20)
|
||||
|
||||
assert [r["id"] for r in runs] == ["cron_alpha_00000000"]
|
||||
|
||||
def test_ignores_non_cron_sessions(self, db):
|
||||
base = 1_700_000_000.0
|
||||
self._seed_run(db, "alpha", 0, base)
|
||||
# A non-cron session whose id happens to share the prefix shape.
|
||||
db.create_session(session_id="cron_alpha_99999999", source="cli")
|
||||
db._conn.execute(
|
||||
"UPDATE sessions SET started_at = ? WHERE id = ?",
|
||||
(base + 100, "cron_alpha_99999999"),
|
||||
)
|
||||
db._conn.commit()
|
||||
|
||||
runs = db.list_cron_job_runs("alpha", limit=20)
|
||||
|
||||
assert [r["id"] for r in runs] == ["cron_alpha_00000000"]
|
||||
|
||||
def test_limit_and_offset_paging(self, db):
|
||||
base = 1_700_000_000.0
|
||||
for i in range(10):
|
||||
self._seed_run(db, "alpha", i, base + i * 60)
|
||||
|
||||
page1 = db.list_cron_job_runs("alpha", limit=4, offset=0)
|
||||
page2 = db.list_cron_job_runs("alpha", limit=4, offset=4)
|
||||
|
||||
assert len(page1) == 4
|
||||
assert len(page2) == 4
|
||||
assert {r["id"] for r in page1}.isdisjoint({r["id"] for r in page2})
|
||||
# Combined window is still newest-first and contiguous.
|
||||
combined = [r["started_at"] for r in page1 + page2]
|
||||
assert combined == sorted(combined, reverse=True)
|
||||
|
||||
def test_uses_index_range_scan(self, db):
|
||||
"""The query must use the (source, id) index, not a full table scan."""
|
||||
prefix = "cron_alpha_"
|
||||
prefix_hi = prefix[:-1] + chr(ord(prefix[-1]) + 1)
|
||||
plan = db._conn.execute(
|
||||
"EXPLAIN QUERY PLAN "
|
||||
"SELECT s.* FROM sessions s "
|
||||
"WHERE s.source = 'cron' AND s.id >= ? AND s.id < ? "
|
||||
"ORDER BY s.started_at DESC LIMIT 20",
|
||||
(prefix, prefix_hi),
|
||||
).fetchall()
|
||||
detail = " ".join(row[-1] for row in plan)
|
||||
assert "USING INDEX" in detail or "USING COVERING INDEX" in detail, detail
|
||||
assert "idx_sessions_source" in detail, detail
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue