fix(cron): add concurrency regression test for parallel job state writes

get_due_jobs() called load_jobs() and save_jobs() without holding
_jobs_file_lock, creating a race with the locked mark_job_run() and
advance_next_run(). Wrap get_due_jobs() with the lock (delegating to a
new _get_due_jobs_locked() inner function) so all load→modify→save
cycles are serialised. Add two regression tests: one verifying 3
concurrent mark_job_run() calls each land their correct last_status and
last_run_at without overwrites, and a stress test confirming 10 parallel
calls each increment their job's completed count to exactly 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Ioodu 2026-04-27 19:08:53 +08:00 committed by Teknium
parent 6875471916
commit 1c7f47a58c
2 changed files with 101 additions and 0 deletions

View file

@ -810,6 +810,12 @@ def get_due_jobs() -> List[Dict[str, Any]]:
the job is fast-forwarded to the next future run instead of firing the job is fast-forwarded to the next future run instead of firing
immediately. This prevents a burst of missed jobs on gateway restart. immediately. This prevents a burst of missed jobs on gateway restart.
""" """
with _jobs_file_lock:
return _get_due_jobs_locked()
def _get_due_jobs_locked() -> List[Dict[str, Any]]:
"""Inner implementation of get_due_jobs(); must be called with _jobs_file_lock held."""
now = _hermes_now() now = _hermes_now()
raw_jobs = load_jobs() raw_jobs = load_jobs()
jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)] jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)]

View file

@ -1,6 +1,7 @@
"""Tests for cron/jobs.py — schedule parsing, job CRUD, and due-job detection.""" """Tests for cron/jobs.py — schedule parsing, job CRUD, and due-job detection."""
import json import json
import threading
import pytest import pytest
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
@ -745,6 +746,100 @@ class TestEnabledToolsets:
assert fetched["enabled_toolsets"] == ["web", "delegation"] assert fetched["enabled_toolsets"] == ["web", "delegation"]
class TestMarkJobRunConcurrency:
"""Regression tests for concurrent parallel job state writes.
tick() dispatches multiple jobs to separate threads simultaneously.
Without _jobs_file_lock protecting the loadmodifysave cycle in
mark_job_run(), concurrent writes can clobber each other's updates
(last-writer-wins), leaving some jobs with stale last_status / last_run_at.
"""
def test_three_concurrent_mark_job_run_no_overwrites(self, tmp_cron_dir):
"""Run mark_job_run() for 3 jobs in parallel threads; all must land correctly."""
# Create 3 distinct recurring jobs
job_a = create_job(prompt="Job A", schedule="every 1h")
job_b = create_job(prompt="Job B", schedule="every 1h")
job_c = create_job(prompt="Job C", schedule="every 1h")
errors: list = []
def run_mark(job_id: str, success: bool, error_msg=None):
try:
mark_job_run(job_id, success=success, error=error_msg)
except Exception as exc: # pragma: no cover
errors.append(exc)
# Fire all three concurrently
threads = [
threading.Thread(target=run_mark, args=(job_a["id"], True)),
threading.Thread(target=run_mark, args=(job_b["id"], False, "timeout")),
threading.Thread(target=run_mark, args=(job_c["id"], True)),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Unexpected exceptions in worker threads: {errors}"
# Verify each job has the correct state — no overwrites
a = get_job(job_a["id"])
b = get_job(job_b["id"])
c = get_job(job_c["id"])
assert a is not None, "Job A was unexpectedly deleted"
assert b is not None, "Job B was unexpectedly deleted"
assert c is not None, "Job C was unexpectedly deleted"
assert a["last_status"] == "ok", f"Job A last_status wrong: {a['last_status']}"
assert a["last_run_at"] is not None, "Job A last_run_at not set"
assert a["repeat"]["completed"] == 1, f"Job A completed count wrong: {a['repeat']['completed']}"
assert b["last_status"] == "error", f"Job B last_status wrong: {b['last_status']}"
assert b["last_error"] == "timeout", f"Job B last_error wrong: {b['last_error']}"
assert b["last_run_at"] is not None, "Job B last_run_at not set"
assert b["repeat"]["completed"] == 1, f"Job B completed count wrong: {b['repeat']['completed']}"
assert c["last_status"] == "ok", f"Job C last_status wrong: {c['last_status']}"
assert c["last_run_at"] is not None, "Job C last_run_at not set"
assert c["repeat"]["completed"] == 1, f"Job C completed count wrong: {c['repeat']['completed']}"
def test_repeated_concurrent_runs_accumulate_completed_count(self, tmp_cron_dir):
"""Stress test: 10 threads each call mark_job_run on a different job once.
The completed count for every job must be exactly 1 after all threads finish,
confirming no thread's write was silently dropped.
"""
n = 10
jobs = [create_job(prompt=f"Stress job {i}", schedule="every 1h") for i in range(n)]
errors: list = []
def run_mark(job_id: str):
try:
mark_job_run(job_id, success=True)
except Exception as exc: # pragma: no cover
errors.append(exc)
threads = [threading.Thread(target=run_mark, args=(j["id"],)) for j in jobs]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Unexpected exceptions: {errors}"
for job in jobs:
updated = get_job(job["id"])
assert updated is not None, f"Job {job['id']} was deleted"
assert updated["last_status"] == "ok", (
f"Job {job['id']} has wrong last_status: {updated['last_status']}"
)
assert updated["repeat"]["completed"] == 1, (
f"Job {job['id']} completed count is {updated['repeat']['completed']}, expected 1"
)
class TestSaveJobOutput: class TestSaveJobOutput:
def test_creates_output_file(self, tmp_cron_dir): def test_creates_output_file(self, tmp_cron_dir):
output_file = save_job_output("test123", "# Results\nEverything ok.") output_file = save_job_output("test123", "# Results\nEverything ok.")