fix: complete cron jobs lock salvage

Route curator rollback through the same cross-process cron job lock, make save_jobs lock for legacy direct callers without deadlocking nested mutation paths, and harden the regression test so a second _jobs_lock caller really blocks across processes.
This commit is contained in:
Teknium 2026-06-15 05:20:59 -07:00
parent e5b4cf7bea
commit 733472952a
4 changed files with 108 additions and 37 deletions

View file

@ -454,16 +454,16 @@ def _restore_cron_skill_links(snapshot_dir: Path) -> Dict[str, Any]:
report["attempted"] = True # we tried but there was nothing to do
return report
# Load and rewrite the live jobs under the scheduler's lock.
# Load and rewrite the live jobs under the scheduler's cross-process lock.
try:
from cron.jobs import load_jobs, save_jobs, _jobs_file_lock
from cron.jobs import load_jobs, save_jobs, _jobs_lock
except ImportError as e:
report["error"] = f"cron module unavailable: {e}"
return report
report["attempted"] = True
try:
with _jobs_file_lock:
with _jobs_lock():
live_jobs = load_jobs()
changed = False

View file

@ -55,7 +55,8 @@ JOBS_FILE = CRON_DIR / "jobs.json"
# In-process lock protecting load_jobs→modify→save_jobs cycles.
# Required when tick() runs jobs in parallel threads — without this,
# concurrent mark_job_run / advance_next_run calls can clobber each other.
_jobs_file_lock = threading.Lock()
_jobs_file_lock = threading.RLock()
_jobs_lock_state = threading.local()
OUTPUT_DIR = CRON_DIR / "output"
ONESHOT_GRACE_SECONDS = 120
@ -80,34 +81,52 @@ def _jobs_lock():
(field updates only no agent execution), so contention resolves in
milliseconds. If neither fcntl nor msvcrt is available the manager still
provides in-process locking, matching the historical behaviour.
Nested calls in the same thread reuse the held lock so legacy callers that
invoke save_jobs() inside a broader mutation section don't deadlock or try
to reacquire the advisory file lock.
"""
with _jobs_file_lock:
lock_fd = None
try:
ensure_dirs()
lock_fd = open(_jobs_lock_file(), "w", encoding="utf-8")
if fcntl is not None:
fcntl.flock(lock_fd, fcntl.LOCK_EX)
elif msvcrt is not None:
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_LOCK, 1)
except (OSError, IOError) as e:
# Never let a locking failure take down cron writes — fall back to
# in-process-only protection (still held via _jobs_file_lock).
logger.warning("jobs.json cross-process lock unavailable (%s); "
"proceeding with in-process lock only", e)
depth = getattr(_jobs_lock_state, "depth", 0)
if depth:
_jobs_lock_state.depth = depth + 1
try:
yield
finally:
if lock_fd is not None:
try:
if fcntl is not None:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
elif msvcrt is not None:
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
except (OSError, IOError):
pass
finally:
lock_fd.close()
_jobs_lock_state.depth -= 1
return
with _jobs_file_lock:
_jobs_lock_state.depth = 1
lock_fd = None
try:
try:
ensure_dirs()
lock_fd = open(_jobs_lock_file(), "a+", encoding="utf-8")
lock_fd.seek(0)
if fcntl is not None:
fcntl.flock(lock_fd, fcntl.LOCK_EX)
elif msvcrt is not None:
getattr(msvcrt, "locking")(lock_fd.fileno(), getattr(msvcrt, "LK_LOCK"), 1)
except (OSError, IOError) as e:
# Never let a locking failure take down cron writes — fall back to
# in-process-only protection (still held via _jobs_file_lock).
logger.warning("jobs.json cross-process lock unavailable (%s); "
"proceeding with in-process lock only", e)
try:
yield
finally:
if lock_fd is not None:
try:
if fcntl is not None:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
elif msvcrt is not None:
getattr(msvcrt, "locking")(lock_fd.fileno(), getattr(msvcrt, "LK_UNLCK"), 1)
except (OSError, IOError):
pass
finally:
lock_fd.close()
finally:
_jobs_lock_state.depth = 0
# Fields on a cron job that must never change after creation. ``id`` is used
# as a filesystem path component under ``OUTPUT_DIR``; allowing it to be
@ -532,8 +551,8 @@ def load_jobs() -> List[Dict[str, Any]]:
)
def save_jobs(jobs: List[Dict[str, Any]]):
"""Save all jobs to storage."""
def _save_jobs_unlocked(jobs: List[Dict[str, Any]]):
"""Save all jobs to storage. Caller must hold _jobs_lock()."""
ensure_dirs()
fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_')
try:
@ -551,6 +570,12 @@ def save_jobs(jobs: List[Dict[str, Any]]):
raise
def save_jobs(jobs: List[Dict[str, Any]]):
"""Save all jobs to storage."""
with _jobs_lock():
_save_jobs_unlocked(jobs)
def _normalize_workdir(workdir: Optional[str]) -> Optional[str]:
"""Normalize and validate a cron job workdir.
@ -1045,7 +1070,7 @@ def get_due_jobs() -> List[Dict[str, Any]]:
def _get_due_jobs_locked() -> List[Dict[str, Any]]:
"""Inner implementation of get_due_jobs(); must be called with _jobs_file_lock held."""
"""Inner implementation of get_due_jobs(); must be called with _jobs_lock held."""
now = _hermes_now()
raw_jobs = load_jobs()
jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)]

View file

@ -89,6 +89,7 @@ AUTHOR_MAP = {
"tharushkadinujaya05@gmail.com": "0xneobyte",
"138671361+Veritas-7@users.noreply.github.com": "Veritas-7",
"keiron@onehanded.com": "kmccammon",
"268233388+CiarasClaws@users.noreply.github.com": "CiarasClaws",
"895252509@qq.com": "895252509",
"35259607+zxcasongs@users.noreply.github.com": "zxcasongs",
"alfred@my-cloud.me": "alfred-smith-0",

View file

@ -22,16 +22,24 @@ import time
import pytest
from cron import jobs
from hermes_constants import get_hermes_home
# Repo root (parent of the ``cron`` package) so the child process can import it.
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(jobs.__file__)))
@pytest.mark.skipif(jobs.fcntl is None, reason="POSIX fcntl/flock required")
def test_jobs_lock_excludes_another_process(tmp_path):
def test_jobs_lock_excludes_another_process(tmp_path, monkeypatch):
cron_dir = tmp_path / "cron"
output_dir = cron_dir / "output"
monkeypatch.setattr(jobs, "CRON_DIR", cron_dir)
monkeypatch.setattr(jobs, "JOBS_FILE", cron_dir / "jobs.json")
monkeypatch.setattr(jobs, "OUTPUT_DIR", output_dir)
ready = tmp_path / "child_holds_lock"
release = tmp_path / "child_may_release"
blocker_started = tmp_path / "blocker_started"
blocker_acquired = tmp_path / "blocker_acquired"
holder = tmp_path / "holder.py"
holder.write_text(
textwrap.dedent(
@ -40,6 +48,10 @@ def test_jobs_lock_excludes_another_process(tmp_path):
sys.path.insert(0, {_REPO_ROOT!r})
from cron import jobs
jobs.CRON_DIR = pathlib.Path({str(cron_dir)!r})
jobs.JOBS_FILE = jobs.CRON_DIR / "jobs.json"
jobs.OUTPUT_DIR = jobs.CRON_DIR / "output"
with jobs._jobs_lock():
pathlib.Path({str(ready)!r}).write_text("1")
# Hold the lock until the parent signals (bounded so a wedged
@ -52,7 +64,27 @@ def test_jobs_lock_excludes_another_process(tmp_path):
)
)
blocker = tmp_path / "blocker.py"
blocker.write_text(
textwrap.dedent(
f"""
import sys, pathlib
sys.path.insert(0, {_REPO_ROOT!r})
from cron import jobs
jobs.CRON_DIR = pathlib.Path({str(cron_dir)!r})
jobs.JOBS_FILE = jobs.CRON_DIR / "jobs.json"
jobs.OUTPUT_DIR = jobs.CRON_DIR / "output"
pathlib.Path({str(blocker_started)!r}).write_text("1")
with jobs._jobs_lock():
pathlib.Path({str(blocker_acquired)!r}).write_text("1")
"""
)
)
child = subprocess.Popen([sys.executable, str(holder)])
blocker_child = None
try:
# Wait until the child is inside the critical section.
for _ in range(1000):
@ -63,19 +95,32 @@ def test_jobs_lock_excludes_another_process(tmp_path):
# While the child holds it, a non-blocking acquire of the SAME lock file
# from this process must fail. A threading.Lock could never block here.
# Resolve the lock path at runtime (not jobs._JOBS_LOCK_FILE, which is
# bound at import time) so it matches the child even when the test suite
# redirects HERMES_HOME to a per-test tempdir.
lock_file = get_hermes_home() / "cron" / ".jobs.lock"
lock_file = jobs._jobs_lock_file()
fd = os.open(str(lock_file), os.O_RDWR | os.O_CREAT)
try:
with pytest.raises(OSError):
jobs.fcntl.flock(fd, jobs.fcntl.LOCK_EX | jobs.fcntl.LOCK_NB)
finally:
os.close(fd)
# A second _jobs_lock() caller in another process should block until the
# holder releases, rather than falling through with only a process-local
# threading lock.
blocker_child = subprocess.Popen([sys.executable, str(blocker)])
for _ in range(1000):
if blocker_started.exists():
break
time.sleep(0.01)
assert blocker_started.exists(), "blocker process never started"
time.sleep(0.05)
assert not blocker_acquired.exists(), "second process entered _jobs_lock() while held"
finally:
release.write_text("1")
child.wait(timeout=15)
if blocker_child is not None:
blocker_child.wait(timeout=15)
assert blocker_acquired.exists(), "second process did not acquire _jobs_lock() after release"
# Once the child has released, the lock is freely acquirable again.
with jobs._jobs_lock():