diff --git a/cron/jobs.py b/cron/jobs.py index 52d9367ff84..aefc22cc159 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -5,6 +5,7 @@ Jobs are stored in ~/.hermes/cron/jobs.json Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md """ +import contextlib import copy import json import logging @@ -14,6 +15,19 @@ import threading import os import re import uuid + +# Cross-process advisory file locking for jobs.json critical sections. +# fcntl is Unix-only; on Windows fall back to msvcrt. Either may be absent, +# in which case _jobs_lock() degrades to in-process locking only (the old +# behaviour) rather than failing. +try: + import fcntl +except ImportError: # pragma: no cover - non-Unix + fcntl = None +try: + import msvcrt +except ImportError: # pragma: no cover - non-Windows + msvcrt = None from datetime import datetime, timedelta from pathlib import Path from hermes_constants import get_hermes_home @@ -45,6 +59,56 @@ _jobs_file_lock = threading.Lock() OUTPUT_DIR = CRON_DIR / "output" ONESHOT_GRACE_SECONDS = 120 + +def _jobs_lock_file() -> Path: + """Return the advisory lock path for the current cron directory.""" + return CRON_DIR / ".jobs.lock" + + +@contextlib.contextmanager +def _jobs_lock(): + """Serialize a load_jobs→modify→save_jobs critical section. + + Combines the in-process threading lock (cheap mutual exclusion between + the gateway's parallel tick threads) with a cross-process advisory file + lock on ``/.jobs.lock`` (mutual exclusion between the gateway process + and standalone ``hermes`` CLI invocations, which previously shared no lock + at all — a `cron pause` could be silently clobbered by a concurrent + gateway write, leaving a "paused" job still firing). + + The flock is blocking, but every critical section that uses it is short + (field updates only — no agent execution), so contention resolves in + milliseconds. If neither fcntl nor msvcrt is available the manager still + provides in-process locking, matching the historical behaviour. + """ + with _jobs_file_lock: + lock_fd = None + try: + ensure_dirs() + lock_fd = open(_jobs_lock_file(), "w", encoding="utf-8") + if fcntl is not None: + fcntl.flock(lock_fd, fcntl.LOCK_EX) + elif msvcrt is not None: + msvcrt.locking(lock_fd.fileno(), msvcrt.LK_LOCK, 1) + except (OSError, IOError) as e: + # Never let a locking failure take down cron writes — fall back to + # in-process-only protection (still held via _jobs_file_lock). + logger.warning("jobs.json cross-process lock unavailable (%s); " + "proceeding with in-process lock only", e) + try: + yield + finally: + if lock_fd is not None: + try: + if fcntl is not None: + fcntl.flock(lock_fd, fcntl.LOCK_UN) + elif msvcrt is not None: + msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1) + except (OSError, IOError): + pass + finally: + lock_fd.close() + # Fields on a cron job that must never change after creation. ``id`` is used # as a filesystem path component under ``OUTPUT_DIR``; allowing it to be # updated lets an unsafe value (``../escape``, absolute path, nested) leak @@ -670,9 +734,10 @@ def create_job( "workdir": normalized_workdir, } - jobs = load_jobs() - jobs.append(job) - save_jobs(jobs) + with _jobs_lock(): + jobs = load_jobs() + jobs.append(job) + save_jobs(jobs) return job @@ -743,49 +808,50 @@ def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]] f"Cron job field(s) cannot be updated: {', '.join(sorted(bad_fields))}" ) - jobs = load_jobs() - for i, job in enumerate(jobs): - if job["id"] != job_id: - continue + with _jobs_lock(): + jobs = load_jobs() + for i, job in enumerate(jobs): + if job["id"] != job_id: + continue - # Validate / normalize workdir if present in updates. Empty string or - # None both mean "clear the field" (restore old behaviour). - if "workdir" in updates: - _wd = updates["workdir"] - if _wd in {None, "", False}: - updates["workdir"] = None - else: - updates["workdir"] = _normalize_workdir(_wd) + # Validate / normalize workdir if present in updates. Empty string + # or None both mean "clear the field" (restore old behaviour). + if "workdir" in updates: + _wd = updates["workdir"] + if _wd in {None, "", False}: + updates["workdir"] = None + else: + updates["workdir"] = _normalize_workdir(_wd) - updated = _apply_skill_fields({**job, **updates}) - schedule_changed = "schedule" in updates + updated = _apply_skill_fields({**job, **updates}) + schedule_changed = "schedule" in updates - if "skills" in updates or "skill" in updates: - normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills")) - updated["skills"] = normalized_skills - updated["skill"] = normalized_skills[0] if normalized_skills else None + if "skills" in updates or "skill" in updates: + normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills")) + updated["skills"] = normalized_skills + updated["skill"] = normalized_skills[0] if normalized_skills else None - if schedule_changed: - updated_schedule = updated["schedule"] - # The API may pass schedule as a raw string (e.g. "every 10m") - # instead of a pre-parsed dict. Normalize it the same way - # create_job() does so downstream code can call .get() safely. - if isinstance(updated_schedule, str): - updated_schedule = parse_schedule(updated_schedule) - updated["schedule"] = updated_schedule - updated["schedule_display"] = updates.get( - "schedule_display", - updated_schedule.get("display", updated.get("schedule_display")), - ) - if updated.get("state") != "paused": - updated["next_run_at"] = compute_next_run(updated_schedule) + if schedule_changed: + updated_schedule = updated["schedule"] + # The API may pass schedule as a raw string (e.g. "every 10m") + # instead of a pre-parsed dict. Normalize it the same way + # create_job() does so downstream code can call .get() safely. + if isinstance(updated_schedule, str): + updated_schedule = parse_schedule(updated_schedule) + updated["schedule"] = updated_schedule + updated["schedule_display"] = updates.get( + "schedule_display", + updated_schedule.get("display", updated.get("schedule_display")), + ) + if updated.get("state") != "paused": + updated["next_run_at"] = compute_next_run(updated_schedule) - if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"): - updated["next_run_at"] = compute_next_run(updated["schedule"]) + if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"): + updated["next_run_at"] = compute_next_run(updated["schedule"]) - jobs[i] = updated - save_jobs(jobs) - return _normalize_job_record(jobs[i]) + jobs[i] = updated + save_jobs(jobs) + return _normalize_job_record(jobs[i]) return None @@ -847,19 +913,20 @@ def remove_job(job_id: str) -> bool: if not job: return False canonical_id = job["id"] - jobs = load_jobs() - original_len = len(jobs) - jobs = [j for j in jobs if j["id"] != canonical_id] - if len(jobs) < original_len: - # Resolve the output dir BEFORE saving so a legacy unsafe ID (e.g. - # left over from before the create-time guard) fails closed without - # half-applying the removal. - job_output_dir = _job_output_dir(canonical_id) - save_jobs(jobs) - # Clean up output directory to prevent orphaned dirs accumulating - if job_output_dir.exists(): - shutil.rmtree(job_output_dir) - return True + with _jobs_lock(): + jobs = load_jobs() + original_len = len(jobs) + jobs = [j for j in jobs if j["id"] != canonical_id] + if len(jobs) < original_len: + # Resolve the output dir BEFORE saving so a legacy unsafe ID (e.g. + # left over from before the create-time guard) fails closed without + # half-applying the removal. + job_output_dir = _job_output_dir(canonical_id) + save_jobs(jobs) + # Clean up output directory to prevent orphaned dirs accumulating + if job_output_dir.exists(): + shutil.rmtree(job_output_dir) + return True return False @@ -874,7 +941,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, ``delivery_error`` is tracked separately from the agent error — a job can succeed (agent produced output) but fail delivery (platform down). """ - with _jobs_file_lock: + with _jobs_lock(): jobs = load_jobs() for i, job in enumerate(jobs): if job["id"] == job_id: @@ -948,7 +1015,7 @@ def advance_next_run(job_id: str) -> bool: Returns True if next_run_at was advanced, False otherwise. """ - with _jobs_file_lock: + with _jobs_lock(): jobs = load_jobs() for job in jobs: if job["id"] == job_id: @@ -973,7 +1040,7 @@ def get_due_jobs() -> List[Dict[str, Any]]: the job is fast-forwarded to the next future run instead of firing immediately. This prevents a burst of missed jobs on gateway restart. """ - with _jobs_file_lock: + with _jobs_lock(): return _get_due_jobs_locked() @@ -1158,7 +1225,7 @@ def rewrite_skill_refs( if not consolidated and not pruned_set: return {"rewrites": [], "jobs_updated": 0, "jobs_scanned": 0} - with _jobs_file_lock: + with _jobs_lock(): jobs = load_jobs() rewrites: List[Dict[str, Any]] = [] changed = False diff --git a/tests/cron/test_jobs_crossprocess_lock.py b/tests/cron/test_jobs_crossprocess_lock.py new file mode 100644 index 00000000000..bae48ab0eae --- /dev/null +++ b/tests/cron/test_jobs_crossprocess_lock.py @@ -0,0 +1,82 @@ +"""Regression test for the jobs.json cross-process lock. + +Background: ``hermes cron pause`` runs in its own process (CLI → cronjob tool → +``pause_job`` → ``update_job`` → ``save_jobs``), entirely separate from the +gateway process that also writes ``jobs.json`` (``mark_job_run`` / +``advance_next_run`` / due-fast-forward). The module's ``threading.Lock`` only +serializes writers *inside one process*, so a CLI pause issued while the gateway +was live could be silently lost to a concurrent gateway write — the job kept +firing even though the CLI reported "Paused". + +``_jobs_lock()`` closes that gap with a short-held cross-process advisory file +lock. This test proves the lock actually excludes a *separate process*, which an +in-process ``threading.Lock`` cannot do. +""" + +import os +import subprocess +import sys +import textwrap +import time + +import pytest + +from cron import jobs +from hermes_constants import get_hermes_home + +# Repo root (parent of the ``cron`` package) so the child process can import it. +_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(jobs.__file__))) + + +@pytest.mark.skipif(jobs.fcntl is None, reason="POSIX fcntl/flock required") +def test_jobs_lock_excludes_another_process(tmp_path): + ready = tmp_path / "child_holds_lock" + release = tmp_path / "child_may_release" + holder = tmp_path / "holder.py" + holder.write_text( + textwrap.dedent( + f""" + import sys, time, pathlib + sys.path.insert(0, {_REPO_ROOT!r}) + from cron import jobs + + with jobs._jobs_lock(): + pathlib.Path({str(ready)!r}).write_text("1") + # Hold the lock until the parent signals (bounded so a wedged + # test can never hang CI). + for _ in range(1000): + if pathlib.Path({str(release)!r}).exists(): + break + time.sleep(0.01) + """ + ) + ) + + child = subprocess.Popen([sys.executable, str(holder)]) + try: + # Wait until the child is inside the critical section. + for _ in range(1000): + if ready.exists(): + break + time.sleep(0.01) + assert ready.exists(), "child never acquired _jobs_lock()" + + # While the child holds it, a non-blocking acquire of the SAME lock file + # from this process must fail. A threading.Lock could never block here. + # Resolve the lock path at runtime (not jobs._JOBS_LOCK_FILE, which is + # bound at import time) so it matches the child even when the test suite + # redirects HERMES_HOME to a per-test tempdir. + lock_file = get_hermes_home() / "cron" / ".jobs.lock" + fd = os.open(str(lock_file), os.O_RDWR | os.O_CREAT) + try: + with pytest.raises(OSError): + jobs.fcntl.flock(fd, jobs.fcntl.LOCK_EX | jobs.fcntl.LOCK_NB) + finally: + os.close(fd) + finally: + release.write_text("1") + child.wait(timeout=15) + + # Once the child has released, the lock is freely acquirable again. + with jobs._jobs_lock(): + pass