diff --git a/cron/jobs.py b/cron/jobs.py index 93098bd86b..93ad4c17fb 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -810,6 +810,12 @@ def get_due_jobs() -> List[Dict[str, Any]]: the job is fast-forwarded to the next future run instead of firing immediately. This prevents a burst of missed jobs on gateway restart. """ + with _jobs_file_lock: + return _get_due_jobs_locked() + + +def _get_due_jobs_locked() -> List[Dict[str, Any]]: + """Inner implementation of get_due_jobs(); must be called with _jobs_file_lock held.""" now = _hermes_now() raw_jobs = load_jobs() jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)] diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index b9d34e1a5c..0405f997b1 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -1,6 +1,7 @@ """Tests for cron/jobs.py — schedule parsing, job CRUD, and due-job detection.""" import json +import threading import pytest from datetime import datetime, timedelta, timezone from pathlib import Path @@ -745,6 +746,100 @@ class TestEnabledToolsets: assert fetched["enabled_toolsets"] == ["web", "delegation"] +class TestMarkJobRunConcurrency: + """Regression tests for concurrent parallel job state writes. + + tick() dispatches multiple jobs to separate threads simultaneously. + Without _jobs_file_lock protecting the load→modify→save cycle in + mark_job_run(), concurrent writes can clobber each other's updates + (last-writer-wins), leaving some jobs with stale last_status / last_run_at. + """ + + def test_three_concurrent_mark_job_run_no_overwrites(self, tmp_cron_dir): + """Run mark_job_run() for 3 jobs in parallel threads; all must land correctly.""" + # Create 3 distinct recurring jobs + job_a = create_job(prompt="Job A", schedule="every 1h") + job_b = create_job(prompt="Job B", schedule="every 1h") + job_c = create_job(prompt="Job C", schedule="every 1h") + + errors: list = [] + + def run_mark(job_id: str, success: bool, error_msg=None): + try: + mark_job_run(job_id, success=success, error=error_msg) + except Exception as exc: # pragma: no cover + errors.append(exc) + + # Fire all three concurrently + threads = [ + threading.Thread(target=run_mark, args=(job_a["id"], True)), + threading.Thread(target=run_mark, args=(job_b["id"], False, "timeout")), + threading.Thread(target=run_mark, args=(job_c["id"], True)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Unexpected exceptions in worker threads: {errors}" + + # Verify each job has the correct state — no overwrites + a = get_job(job_a["id"]) + b = get_job(job_b["id"]) + c = get_job(job_c["id"]) + + assert a is not None, "Job A was unexpectedly deleted" + assert b is not None, "Job B was unexpectedly deleted" + assert c is not None, "Job C was unexpectedly deleted" + + assert a["last_status"] == "ok", f"Job A last_status wrong: {a['last_status']}" + assert a["last_run_at"] is not None, "Job A last_run_at not set" + assert a["repeat"]["completed"] == 1, f"Job A completed count wrong: {a['repeat']['completed']}" + + assert b["last_status"] == "error", f"Job B last_status wrong: {b['last_status']}" + assert b["last_error"] == "timeout", f"Job B last_error wrong: {b['last_error']}" + assert b["last_run_at"] is not None, "Job B last_run_at not set" + assert b["repeat"]["completed"] == 1, f"Job B completed count wrong: {b['repeat']['completed']}" + + assert c["last_status"] == "ok", f"Job C last_status wrong: {c['last_status']}" + assert c["last_run_at"] is not None, "Job C last_run_at not set" + assert c["repeat"]["completed"] == 1, f"Job C completed count wrong: {c['repeat']['completed']}" + + def test_repeated_concurrent_runs_accumulate_completed_count(self, tmp_cron_dir): + """Stress test: 10 threads each call mark_job_run on a different job once. + + The completed count for every job must be exactly 1 after all threads finish, + confirming no thread's write was silently dropped. + """ + n = 10 + jobs = [create_job(prompt=f"Stress job {i}", schedule="every 1h") for i in range(n)] + errors: list = [] + + def run_mark(job_id: str): + try: + mark_job_run(job_id, success=True) + except Exception as exc: # pragma: no cover + errors.append(exc) + + threads = [threading.Thread(target=run_mark, args=(j["id"],)) for j in jobs] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Unexpected exceptions: {errors}" + + for job in jobs: + updated = get_job(job["id"]) + assert updated is not None, f"Job {job['id']} was deleted" + assert updated["last_status"] == "ok", ( + f"Job {job['id']} has wrong last_status: {updated['last_status']}" + ) + assert updated["repeat"]["completed"] == 1, ( + f"Job {job['id']} completed count is {updated['repeat']['completed']}, expected 1" + ) + + class TestSaveJobOutput: def test_creates_output_file(self, tmp_cron_dir): output_file = save_job_output("test123", "# Results\nEverything ok.")