diff --git a/agent/curator_backup.py b/agent/curator_backup.py index 7725f1c71f3..944886d729a 100644 --- a/agent/curator_backup.py +++ b/agent/curator_backup.py @@ -454,16 +454,16 @@ def _restore_cron_skill_links(snapshot_dir: Path) -> Dict[str, Any]: report["attempted"] = True # we tried but there was nothing to do return report - # Load and rewrite the live jobs under the scheduler's lock. + # Load and rewrite the live jobs under the scheduler's cross-process lock. try: - from cron.jobs import load_jobs, save_jobs, _jobs_file_lock + from cron.jobs import load_jobs, save_jobs, _jobs_lock except ImportError as e: report["error"] = f"cron module unavailable: {e}" return report report["attempted"] = True try: - with _jobs_file_lock: + with _jobs_lock(): live_jobs = load_jobs() changed = False diff --git a/cron/jobs.py b/cron/jobs.py index aefc22cc159..178bd0fad81 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -55,7 +55,8 @@ JOBS_FILE = CRON_DIR / "jobs.json" # In-process lock protecting load_jobs→modify→save_jobs cycles. # Required when tick() runs jobs in parallel threads — without this, # concurrent mark_job_run / advance_next_run calls can clobber each other. -_jobs_file_lock = threading.Lock() +_jobs_file_lock = threading.RLock() +_jobs_lock_state = threading.local() OUTPUT_DIR = CRON_DIR / "output" ONESHOT_GRACE_SECONDS = 120 @@ -80,34 +81,52 @@ def _jobs_lock(): (field updates only — no agent execution), so contention resolves in milliseconds. If neither fcntl nor msvcrt is available the manager still provides in-process locking, matching the historical behaviour. + + Nested calls in the same thread reuse the held lock so legacy callers that + invoke save_jobs() inside a broader mutation section don't deadlock or try + to reacquire the advisory file lock. """ - with _jobs_file_lock: - lock_fd = None - try: - ensure_dirs() - lock_fd = open(_jobs_lock_file(), "w", encoding="utf-8") - if fcntl is not None: - fcntl.flock(lock_fd, fcntl.LOCK_EX) - elif msvcrt is not None: - msvcrt.locking(lock_fd.fileno(), msvcrt.LK_LOCK, 1) - except (OSError, IOError) as e: - # Never let a locking failure take down cron writes — fall back to - # in-process-only protection (still held via _jobs_file_lock). - logger.warning("jobs.json cross-process lock unavailable (%s); " - "proceeding with in-process lock only", e) + depth = getattr(_jobs_lock_state, "depth", 0) + if depth: + _jobs_lock_state.depth = depth + 1 try: yield finally: - if lock_fd is not None: - try: - if fcntl is not None: - fcntl.flock(lock_fd, fcntl.LOCK_UN) - elif msvcrt is not None: - msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1) - except (OSError, IOError): - pass - finally: - lock_fd.close() + _jobs_lock_state.depth -= 1 + return + + with _jobs_file_lock: + _jobs_lock_state.depth = 1 + lock_fd = None + try: + try: + ensure_dirs() + lock_fd = open(_jobs_lock_file(), "a+", encoding="utf-8") + lock_fd.seek(0) + if fcntl is not None: + fcntl.flock(lock_fd, fcntl.LOCK_EX) + elif msvcrt is not None: + getattr(msvcrt, "locking")(lock_fd.fileno(), getattr(msvcrt, "LK_LOCK"), 1) + except (OSError, IOError) as e: + # Never let a locking failure take down cron writes — fall back to + # in-process-only protection (still held via _jobs_file_lock). + logger.warning("jobs.json cross-process lock unavailable (%s); " + "proceeding with in-process lock only", e) + try: + yield + finally: + if lock_fd is not None: + try: + if fcntl is not None: + fcntl.flock(lock_fd, fcntl.LOCK_UN) + elif msvcrt is not None: + getattr(msvcrt, "locking")(lock_fd.fileno(), getattr(msvcrt, "LK_UNLCK"), 1) + except (OSError, IOError): + pass + finally: + lock_fd.close() + finally: + _jobs_lock_state.depth = 0 # Fields on a cron job that must never change after creation. ``id`` is used # as a filesystem path component under ``OUTPUT_DIR``; allowing it to be @@ -532,8 +551,8 @@ def load_jobs() -> List[Dict[str, Any]]: ) -def save_jobs(jobs: List[Dict[str, Any]]): - """Save all jobs to storage.""" +def _save_jobs_unlocked(jobs: List[Dict[str, Any]]): + """Save all jobs to storage. Caller must hold _jobs_lock().""" ensure_dirs() fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_') try: @@ -551,6 +570,12 @@ def save_jobs(jobs: List[Dict[str, Any]]): raise +def save_jobs(jobs: List[Dict[str, Any]]): + """Save all jobs to storage.""" + with _jobs_lock(): + _save_jobs_unlocked(jobs) + + def _normalize_workdir(workdir: Optional[str]) -> Optional[str]: """Normalize and validate a cron job workdir. @@ -1045,7 +1070,7 @@ def get_due_jobs() -> List[Dict[str, Any]]: def _get_due_jobs_locked() -> List[Dict[str, Any]]: - """Inner implementation of get_due_jobs(); must be called with _jobs_file_lock held.""" + """Inner implementation of get_due_jobs(); must be called with _jobs_lock held.""" now = _hermes_now() raw_jobs = load_jobs() jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)] diff --git a/scripts/release.py b/scripts/release.py index 6ab49a40953..e5b0fd2226d 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -89,6 +89,7 @@ AUTHOR_MAP = { "tharushkadinujaya05@gmail.com": "0xneobyte", "138671361+Veritas-7@users.noreply.github.com": "Veritas-7", "keiron@onehanded.com": "kmccammon", + "268233388+CiarasClaws@users.noreply.github.com": "CiarasClaws", "895252509@qq.com": "895252509", "35259607+zxcasongs@users.noreply.github.com": "zxcasongs", "alfred@my-cloud.me": "alfred-smith-0", diff --git a/tests/cron/test_jobs_crossprocess_lock.py b/tests/cron/test_jobs_crossprocess_lock.py index bae48ab0eae..97c0aa77cf0 100644 --- a/tests/cron/test_jobs_crossprocess_lock.py +++ b/tests/cron/test_jobs_crossprocess_lock.py @@ -22,16 +22,24 @@ import time import pytest from cron import jobs -from hermes_constants import get_hermes_home + # Repo root (parent of the ``cron`` package) so the child process can import it. _REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(jobs.__file__))) @pytest.mark.skipif(jobs.fcntl is None, reason="POSIX fcntl/flock required") -def test_jobs_lock_excludes_another_process(tmp_path): +def test_jobs_lock_excludes_another_process(tmp_path, monkeypatch): + cron_dir = tmp_path / "cron" + output_dir = cron_dir / "output" + monkeypatch.setattr(jobs, "CRON_DIR", cron_dir) + monkeypatch.setattr(jobs, "JOBS_FILE", cron_dir / "jobs.json") + monkeypatch.setattr(jobs, "OUTPUT_DIR", output_dir) + ready = tmp_path / "child_holds_lock" release = tmp_path / "child_may_release" + blocker_started = tmp_path / "blocker_started" + blocker_acquired = tmp_path / "blocker_acquired" holder = tmp_path / "holder.py" holder.write_text( textwrap.dedent( @@ -40,6 +48,10 @@ def test_jobs_lock_excludes_another_process(tmp_path): sys.path.insert(0, {_REPO_ROOT!r}) from cron import jobs + jobs.CRON_DIR = pathlib.Path({str(cron_dir)!r}) + jobs.JOBS_FILE = jobs.CRON_DIR / "jobs.json" + jobs.OUTPUT_DIR = jobs.CRON_DIR / "output" + with jobs._jobs_lock(): pathlib.Path({str(ready)!r}).write_text("1") # Hold the lock until the parent signals (bounded so a wedged @@ -52,7 +64,27 @@ def test_jobs_lock_excludes_another_process(tmp_path): ) ) + blocker = tmp_path / "blocker.py" + blocker.write_text( + textwrap.dedent( + f""" + import sys, pathlib + sys.path.insert(0, {_REPO_ROOT!r}) + from cron import jobs + + jobs.CRON_DIR = pathlib.Path({str(cron_dir)!r}) + jobs.JOBS_FILE = jobs.CRON_DIR / "jobs.json" + jobs.OUTPUT_DIR = jobs.CRON_DIR / "output" + + pathlib.Path({str(blocker_started)!r}).write_text("1") + with jobs._jobs_lock(): + pathlib.Path({str(blocker_acquired)!r}).write_text("1") + """ + ) + ) + child = subprocess.Popen([sys.executable, str(holder)]) + blocker_child = None try: # Wait until the child is inside the critical section. for _ in range(1000): @@ -63,19 +95,32 @@ def test_jobs_lock_excludes_another_process(tmp_path): # While the child holds it, a non-blocking acquire of the SAME lock file # from this process must fail. A threading.Lock could never block here. - # Resolve the lock path at runtime (not jobs._JOBS_LOCK_FILE, which is - # bound at import time) so it matches the child even when the test suite - # redirects HERMES_HOME to a per-test tempdir. - lock_file = get_hermes_home() / "cron" / ".jobs.lock" + lock_file = jobs._jobs_lock_file() fd = os.open(str(lock_file), os.O_RDWR | os.O_CREAT) try: with pytest.raises(OSError): jobs.fcntl.flock(fd, jobs.fcntl.LOCK_EX | jobs.fcntl.LOCK_NB) finally: os.close(fd) + + # A second _jobs_lock() caller in another process should block until the + # holder releases, rather than falling through with only a process-local + # threading lock. + blocker_child = subprocess.Popen([sys.executable, str(blocker)]) + for _ in range(1000): + if blocker_started.exists(): + break + time.sleep(0.01) + assert blocker_started.exists(), "blocker process never started" + time.sleep(0.05) + assert not blocker_acquired.exists(), "second process entered _jobs_lock() while held" finally: release.write_text("1") child.wait(timeout=15) + if blocker_child is not None: + blocker_child.wait(timeout=15) + + assert blocker_acquired.exists(), "second process did not acquire _jobs_lock() after release" # Once the child has released, the lock is freely acquirable again. with jobs._jobs_lock():