mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-17 09:41:58 +00:00
fix(cron): make jobs.json writes safe across processes
`hermes cron pause`/`resume`/`remove` run in their own CLI process (CLI → cronjob tool → pause_job → update_job → save_jobs), entirely separate from the gateway process that also writes jobs.json (mark_job_run, advance_next_run, due-fast-forward in get_due_jobs). The only synchronization was a module-level `threading.Lock`, which serializes writers *within a single process* but does nothing across processes — and update_job/pause_job/remove_job/create_job did not even take it. The result is a classic lost update: a `cron pause` issued while the gateway is live loads jobs.json, sets enabled=False, and saves; concurrently the gateway loads the same file and saves back its run-bookkeeping, clobbering the pause. The CLI prints "Paused" (it succeeded against its own in-memory copy) but the job stays enabled and keeps firing, with no error surfaced. The scheduler's `.tick.lock` flock can't be reused for this — it is held for the entire tick, including multi-minute agent runs, so a CLI mutation would block for minutes. Add `_jobs_lock()`: a short-held cross-process advisory file lock (fcntl/msvcrt flock on `<hermes_home>/cron/.jobs.lock`) layered over the existing in-process lock, and wrap every load→modify→save critical section with it — create_job, update_job, remove_job, mark_job_run, advance_next_run, get_due_jobs, rewrite_skill_refs. The lock degrades to in-process-only if neither fcntl nor msvcrt is available, preserving prior behaviour. All critical sections are short (field edits, no agent execution), so contention resolves in milliseconds. Adds a regression test that proves the lock excludes a second process (an in-process threading.Lock cannot). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
29c6985590
commit
e5b4cf7bea
2 changed files with 206 additions and 57 deletions
181
cron/jobs.py
181
cron/jobs.py
|
|
@ -5,6 +5,7 @@ Jobs are stored in ~/.hermes/cron/jobs.json
|
|||
Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md
|
||||
"""
|
||||
|
||||
import contextlib
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
|
|
@ -14,6 +15,19 @@ import threading
|
|||
import os
|
||||
import re
|
||||
import uuid
|
||||
|
||||
# Cross-process advisory file locking for jobs.json critical sections.
|
||||
# fcntl is Unix-only; on Windows fall back to msvcrt. Either may be absent,
|
||||
# in which case _jobs_lock() degrades to in-process locking only (the old
|
||||
# behaviour) rather than failing.
|
||||
try:
|
||||
import fcntl
|
||||
except ImportError: # pragma: no cover - non-Unix
|
||||
fcntl = None
|
||||
try:
|
||||
import msvcrt
|
||||
except ImportError: # pragma: no cover - non-Windows
|
||||
msvcrt = None
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from hermes_constants import get_hermes_home
|
||||
|
|
@ -45,6 +59,56 @@ _jobs_file_lock = threading.Lock()
|
|||
OUTPUT_DIR = CRON_DIR / "output"
|
||||
ONESHOT_GRACE_SECONDS = 120
|
||||
|
||||
|
||||
def _jobs_lock_file() -> Path:
|
||||
"""Return the advisory lock path for the current cron directory."""
|
||||
return CRON_DIR / ".jobs.lock"
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _jobs_lock():
|
||||
"""Serialize a load_jobs→modify→save_jobs critical section.
|
||||
|
||||
Combines the in-process threading lock (cheap mutual exclusion between
|
||||
the gateway's parallel tick threads) with a cross-process advisory file
|
||||
lock on ``<cron dir>/.jobs.lock`` (mutual exclusion between the gateway process
|
||||
and standalone ``hermes`` CLI invocations, which previously shared no lock
|
||||
at all — a `cron pause` could be silently clobbered by a concurrent
|
||||
gateway write, leaving a "paused" job still firing).
|
||||
|
||||
The flock is blocking, but every critical section that uses it is short
|
||||
(field updates only — no agent execution), so contention resolves in
|
||||
milliseconds. If neither fcntl nor msvcrt is available the manager still
|
||||
provides in-process locking, matching the historical behaviour.
|
||||
"""
|
||||
with _jobs_file_lock:
|
||||
lock_fd = None
|
||||
try:
|
||||
ensure_dirs()
|
||||
lock_fd = open(_jobs_lock_file(), "w", encoding="utf-8")
|
||||
if fcntl is not None:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_EX)
|
||||
elif msvcrt is not None:
|
||||
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_LOCK, 1)
|
||||
except (OSError, IOError) as e:
|
||||
# Never let a locking failure take down cron writes — fall back to
|
||||
# in-process-only protection (still held via _jobs_file_lock).
|
||||
logger.warning("jobs.json cross-process lock unavailable (%s); "
|
||||
"proceeding with in-process lock only", e)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if lock_fd is not None:
|
||||
try:
|
||||
if fcntl is not None:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
||||
elif msvcrt is not None:
|
||||
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
except (OSError, IOError):
|
||||
pass
|
||||
finally:
|
||||
lock_fd.close()
|
||||
|
||||
# Fields on a cron job that must never change after creation. ``id`` is used
|
||||
# as a filesystem path component under ``OUTPUT_DIR``; allowing it to be
|
||||
# updated lets an unsafe value (``../escape``, absolute path, nested) leak
|
||||
|
|
@ -670,9 +734,10 @@ def create_job(
|
|||
"workdir": normalized_workdir,
|
||||
}
|
||||
|
||||
jobs = load_jobs()
|
||||
jobs.append(job)
|
||||
save_jobs(jobs)
|
||||
with _jobs_lock():
|
||||
jobs = load_jobs()
|
||||
jobs.append(job)
|
||||
save_jobs(jobs)
|
||||
|
||||
return job
|
||||
|
||||
|
|
@ -743,49 +808,50 @@ def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]
|
|||
f"Cron job field(s) cannot be updated: {', '.join(sorted(bad_fields))}"
|
||||
)
|
||||
|
||||
jobs = load_jobs()
|
||||
for i, job in enumerate(jobs):
|
||||
if job["id"] != job_id:
|
||||
continue
|
||||
with _jobs_lock():
|
||||
jobs = load_jobs()
|
||||
for i, job in enumerate(jobs):
|
||||
if job["id"] != job_id:
|
||||
continue
|
||||
|
||||
# Validate / normalize workdir if present in updates. Empty string or
|
||||
# None both mean "clear the field" (restore old behaviour).
|
||||
if "workdir" in updates:
|
||||
_wd = updates["workdir"]
|
||||
if _wd in {None, "", False}:
|
||||
updates["workdir"] = None
|
||||
else:
|
||||
updates["workdir"] = _normalize_workdir(_wd)
|
||||
# Validate / normalize workdir if present in updates. Empty string
|
||||
# or None both mean "clear the field" (restore old behaviour).
|
||||
if "workdir" in updates:
|
||||
_wd = updates["workdir"]
|
||||
if _wd in {None, "", False}:
|
||||
updates["workdir"] = None
|
||||
else:
|
||||
updates["workdir"] = _normalize_workdir(_wd)
|
||||
|
||||
updated = _apply_skill_fields({**job, **updates})
|
||||
schedule_changed = "schedule" in updates
|
||||
updated = _apply_skill_fields({**job, **updates})
|
||||
schedule_changed = "schedule" in updates
|
||||
|
||||
if "skills" in updates or "skill" in updates:
|
||||
normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills"))
|
||||
updated["skills"] = normalized_skills
|
||||
updated["skill"] = normalized_skills[0] if normalized_skills else None
|
||||
if "skills" in updates or "skill" in updates:
|
||||
normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills"))
|
||||
updated["skills"] = normalized_skills
|
||||
updated["skill"] = normalized_skills[0] if normalized_skills else None
|
||||
|
||||
if schedule_changed:
|
||||
updated_schedule = updated["schedule"]
|
||||
# The API may pass schedule as a raw string (e.g. "every 10m")
|
||||
# instead of a pre-parsed dict. Normalize it the same way
|
||||
# create_job() does so downstream code can call .get() safely.
|
||||
if isinstance(updated_schedule, str):
|
||||
updated_schedule = parse_schedule(updated_schedule)
|
||||
updated["schedule"] = updated_schedule
|
||||
updated["schedule_display"] = updates.get(
|
||||
"schedule_display",
|
||||
updated_schedule.get("display", updated.get("schedule_display")),
|
||||
)
|
||||
if updated.get("state") != "paused":
|
||||
updated["next_run_at"] = compute_next_run(updated_schedule)
|
||||
if schedule_changed:
|
||||
updated_schedule = updated["schedule"]
|
||||
# The API may pass schedule as a raw string (e.g. "every 10m")
|
||||
# instead of a pre-parsed dict. Normalize it the same way
|
||||
# create_job() does so downstream code can call .get() safely.
|
||||
if isinstance(updated_schedule, str):
|
||||
updated_schedule = parse_schedule(updated_schedule)
|
||||
updated["schedule"] = updated_schedule
|
||||
updated["schedule_display"] = updates.get(
|
||||
"schedule_display",
|
||||
updated_schedule.get("display", updated.get("schedule_display")),
|
||||
)
|
||||
if updated.get("state") != "paused":
|
||||
updated["next_run_at"] = compute_next_run(updated_schedule)
|
||||
|
||||
if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"):
|
||||
updated["next_run_at"] = compute_next_run(updated["schedule"])
|
||||
if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"):
|
||||
updated["next_run_at"] = compute_next_run(updated["schedule"])
|
||||
|
||||
jobs[i] = updated
|
||||
save_jobs(jobs)
|
||||
return _normalize_job_record(jobs[i])
|
||||
jobs[i] = updated
|
||||
save_jobs(jobs)
|
||||
return _normalize_job_record(jobs[i])
|
||||
return None
|
||||
|
||||
|
||||
|
|
@ -847,19 +913,20 @@ def remove_job(job_id: str) -> bool:
|
|||
if not job:
|
||||
return False
|
||||
canonical_id = job["id"]
|
||||
jobs = load_jobs()
|
||||
original_len = len(jobs)
|
||||
jobs = [j for j in jobs if j["id"] != canonical_id]
|
||||
if len(jobs) < original_len:
|
||||
# Resolve the output dir BEFORE saving so a legacy unsafe ID (e.g.
|
||||
# left over from before the create-time guard) fails closed without
|
||||
# half-applying the removal.
|
||||
job_output_dir = _job_output_dir(canonical_id)
|
||||
save_jobs(jobs)
|
||||
# Clean up output directory to prevent orphaned dirs accumulating
|
||||
if job_output_dir.exists():
|
||||
shutil.rmtree(job_output_dir)
|
||||
return True
|
||||
with _jobs_lock():
|
||||
jobs = load_jobs()
|
||||
original_len = len(jobs)
|
||||
jobs = [j for j in jobs if j["id"] != canonical_id]
|
||||
if len(jobs) < original_len:
|
||||
# Resolve the output dir BEFORE saving so a legacy unsafe ID (e.g.
|
||||
# left over from before the create-time guard) fails closed without
|
||||
# half-applying the removal.
|
||||
job_output_dir = _job_output_dir(canonical_id)
|
||||
save_jobs(jobs)
|
||||
# Clean up output directory to prevent orphaned dirs accumulating
|
||||
if job_output_dir.exists():
|
||||
shutil.rmtree(job_output_dir)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
|
|
@ -874,7 +941,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None,
|
|||
``delivery_error`` is tracked separately from the agent error — a job
|
||||
can succeed (agent produced output) but fail delivery (platform down).
|
||||
"""
|
||||
with _jobs_file_lock:
|
||||
with _jobs_lock():
|
||||
jobs = load_jobs()
|
||||
for i, job in enumerate(jobs):
|
||||
if job["id"] == job_id:
|
||||
|
|
@ -948,7 +1015,7 @@ def advance_next_run(job_id: str) -> bool:
|
|||
|
||||
Returns True if next_run_at was advanced, False otherwise.
|
||||
"""
|
||||
with _jobs_file_lock:
|
||||
with _jobs_lock():
|
||||
jobs = load_jobs()
|
||||
for job in jobs:
|
||||
if job["id"] == job_id:
|
||||
|
|
@ -973,7 +1040,7 @@ def get_due_jobs() -> List[Dict[str, Any]]:
|
|||
the job is fast-forwarded to the next future run instead of firing
|
||||
immediately. This prevents a burst of missed jobs on gateway restart.
|
||||
"""
|
||||
with _jobs_file_lock:
|
||||
with _jobs_lock():
|
||||
return _get_due_jobs_locked()
|
||||
|
||||
|
||||
|
|
@ -1158,7 +1225,7 @@ def rewrite_skill_refs(
|
|||
if not consolidated and not pruned_set:
|
||||
return {"rewrites": [], "jobs_updated": 0, "jobs_scanned": 0}
|
||||
|
||||
with _jobs_file_lock:
|
||||
with _jobs_lock():
|
||||
jobs = load_jobs()
|
||||
rewrites: List[Dict[str, Any]] = []
|
||||
changed = False
|
||||
|
|
|
|||
82
tests/cron/test_jobs_crossprocess_lock.py
Normal file
82
tests/cron/test_jobs_crossprocess_lock.py
Normal file
|
|
@ -0,0 +1,82 @@
|
|||
"""Regression test for the jobs.json cross-process lock.
|
||||
|
||||
Background: ``hermes cron pause`` runs in its own process (CLI → cronjob tool →
|
||||
``pause_job`` → ``update_job`` → ``save_jobs``), entirely separate from the
|
||||
gateway process that also writes ``jobs.json`` (``mark_job_run`` /
|
||||
``advance_next_run`` / due-fast-forward). The module's ``threading.Lock`` only
|
||||
serializes writers *inside one process*, so a CLI pause issued while the gateway
|
||||
was live could be silently lost to a concurrent gateway write — the job kept
|
||||
firing even though the CLI reported "Paused".
|
||||
|
||||
``_jobs_lock()`` closes that gap with a short-held cross-process advisory file
|
||||
lock. This test proves the lock actually excludes a *separate process*, which an
|
||||
in-process ``threading.Lock`` cannot do.
|
||||
"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import textwrap
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from cron import jobs
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
# Repo root (parent of the ``cron`` package) so the child process can import it.
|
||||
_REPO_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(jobs.__file__)))
|
||||
|
||||
|
||||
@pytest.mark.skipif(jobs.fcntl is None, reason="POSIX fcntl/flock required")
|
||||
def test_jobs_lock_excludes_another_process(tmp_path):
|
||||
ready = tmp_path / "child_holds_lock"
|
||||
release = tmp_path / "child_may_release"
|
||||
holder = tmp_path / "holder.py"
|
||||
holder.write_text(
|
||||
textwrap.dedent(
|
||||
f"""
|
||||
import sys, time, pathlib
|
||||
sys.path.insert(0, {_REPO_ROOT!r})
|
||||
from cron import jobs
|
||||
|
||||
with jobs._jobs_lock():
|
||||
pathlib.Path({str(ready)!r}).write_text("1")
|
||||
# Hold the lock until the parent signals (bounded so a wedged
|
||||
# test can never hang CI).
|
||||
for _ in range(1000):
|
||||
if pathlib.Path({str(release)!r}).exists():
|
||||
break
|
||||
time.sleep(0.01)
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
child = subprocess.Popen([sys.executable, str(holder)])
|
||||
try:
|
||||
# Wait until the child is inside the critical section.
|
||||
for _ in range(1000):
|
||||
if ready.exists():
|
||||
break
|
||||
time.sleep(0.01)
|
||||
assert ready.exists(), "child never acquired _jobs_lock()"
|
||||
|
||||
# While the child holds it, a non-blocking acquire of the SAME lock file
|
||||
# from this process must fail. A threading.Lock could never block here.
|
||||
# Resolve the lock path at runtime (not jobs._JOBS_LOCK_FILE, which is
|
||||
# bound at import time) so it matches the child even when the test suite
|
||||
# redirects HERMES_HOME to a per-test tempdir.
|
||||
lock_file = get_hermes_home() / "cron" / ".jobs.lock"
|
||||
fd = os.open(str(lock_file), os.O_RDWR | os.O_CREAT)
|
||||
try:
|
||||
with pytest.raises(OSError):
|
||||
jobs.fcntl.flock(fd, jobs.fcntl.LOCK_EX | jobs.fcntl.LOCK_NB)
|
||||
finally:
|
||||
os.close(fd)
|
||||
finally:
|
||||
release.write_text("1")
|
||||
child.wait(timeout=15)
|
||||
|
||||
# Once the child has released, the lock is freely acquirable again.
|
||||
with jobs._jobs_lock():
|
||||
pass
|
||||
Loading…
Add table
Add a link
Reference in a new issue