feat(cron): store-level CAS claim for multi-machine at-most-once fire

Phase 4C. claim_job_for_fire(job_id, *, claim_ttl_seconds=300) in cron/jobs.py:
under the existing _jobs_lock() file lock, claim a job for a single external
fire so that across N gateway replicas exactly ONE wins. Single-machine
deployments always win (unaffected).

Semantics:
- missing / disabled / paused job → False.
- a fresh fire_claim (younger than claim_ttl_seconds) already present → False
  (someone else holds it). Stale claim (crashed winner) → overwrite, so a job
  is never wedged forever.
- on win: stamp fire_claim={at, by:_machine_id()}; for recurring (cron/interval)
  advance next_run_at (mirrors advance_next_run's at-most-once bump so a stale
  re-delivery can't re-fire); one-shots keep next_run_at but the fresh claim
  blocks a duplicate retry for the same fire.
- mark_job_run now clears fire_claim on completion so a re-armed recurring job
  is claimable again next fire.

_machine_id() (HERMES_MACHINE_ID env, else hostname:pid) is attribution-only;
correctness is the file lock + fresh-claim check, not the id.

This is consumed by CronScheduler.fire_due (Phase 4B). tick is untouched — it
still uses advance_next_run, so the built-in single-machine path is unaffected.

Tests (real store, temp HERMES_HOME): claim-once-then-block + next_run advance,
one-shot no-double-claim, unknown→False, paused→False, stale-claim reclaimable,
mark_job_run clears the claim (recurring re-claimable). tests/cron/ 470 passed.
This commit is contained in:
Ben 2026-06-18 14:34:34 +10:00
parent 6ff5fd373b
commit b01eee0c77
2 changed files with 152 additions and 0 deletions

View file

@ -976,6 +976,9 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None,
job["last_error"] = error if not success else None
# Track delivery failures separately — cleared on successful delivery
job["last_delivery_error"] = delivery_error
# Clear any external-fire claim so a re-armed recurring job can
# be claimed again on its next fire (Phase 4C CAS).
job["fire_claim"] = None
# Increment completed count
if job.get("repeat"):
@ -1057,6 +1060,71 @@ def advance_next_run(job_id: str) -> bool:
return False
def _machine_id() -> str:
"""Stable-ish identifier for claim attribution/debugging (NOT correctness).
Uses ``HERMES_MACHINE_ID`` if set, else hostname + pid. The CAS correctness
comes from the file lock + the fresh-claim check, not from this value.
"""
explicit = os.getenv("HERMES_MACHINE_ID", "").strip()
if explicit:
return explicit
try:
import socket
host = socket.gethostname()
except Exception:
host = "unknown"
return f"{host}:{os.getpid()}"
def claim_job_for_fire(job_id: str, *, claim_ttl_seconds: int = 300) -> bool:
"""Atomically claim a job for a single external 'fire' (multi-machine
at-most-once). Returns True iff THIS caller won the claim.
Used by the external-provider fire path (``CronScheduler.fire_due``) when an
external scheduler (Chronos) signals a job is due across N gateway replicas:
exactly one wins. Single-machine deployments always win.
Under the file lock: reject if the job is missing/disabled/paused. If a
fresh claim (younger than ``claim_ttl_seconds``) already exists, lose.
Otherwise stamp a ``fire_claim`` and, for recurring jobs, advance
``next_run_at`` (mirrors ``advance_next_run``'s at-most-once bump so a stale
re-delivery for the old time can't re-fire). One-shots keep ``next_run_at``
but the fresh ``fire_claim`` blocks a duplicate retry for the same fire.
``mark_job_run`` clears the claim on completion so a re-armed recurring job
is claimable again next fire.
The stale-claim TTL means a machine that crashed after claiming but before
completing doesn't wedge the job forever — after the TTL another fire can
reclaim it.
"""
with _jobs_lock():
jobs = load_jobs()
for job in jobs:
if job["id"] != job_id:
continue
if not job.get("enabled", True) or job.get("state") == "paused":
return False
now = _hermes_now()
existing = job.get("fire_claim")
if existing:
try:
claimed_at = _ensure_aware(datetime.fromisoformat(existing["at"]))
if (now - claimed_at).total_seconds() < claim_ttl_seconds:
return False # someone holds a fresh claim
except Exception:
pass # malformed claim → overwrite
job["fire_claim"] = {"at": now.isoformat(), "by": _machine_id()}
kind = job.get("schedule", {}).get("kind")
if kind in {"cron", "interval"}:
nxt = compute_next_run(job["schedule"], now.isoformat())
if nxt:
job["next_run_at"] = nxt
save_jobs(jobs)
return True
return False
def get_due_jobs() -> List[Dict[str, Any]]:
"""Get all jobs that are due to run now.

View file

@ -0,0 +1,84 @@
"""Tests for the store-level CAS fire claim (Phase 4C).
`claim_job_for_fire` gives multi-machine at-most-once semantics when an external
scheduler (Chronos) fires a job: across N gateway replicas, exactly ONE wins the
claim for a given fire. Single-machine deployments always win (unaffected).
These exercise the real store against a temp HERMES_HOME (no mocks) per the
E2E-over-mocks discipline for file-touching code.
"""
import pytest
@pytest.fixture
def temp_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME so jobs.json doesn't touch the real store."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# cron.jobs caches no home at import; get_hermes_home() reads the env live.
yield tmp_path
def test_claim_succeeds_once_then_blocks(temp_home):
"""First claim for a fire wins; a second claim for the same fire loses, and
next_run_at is advanced (a re-delivery for the old time can't re-fire)."""
from cron.jobs import create_job, claim_job_for_fire, get_job
job = create_job(prompt="x", schedule="every 5m", name="t")
jid = job["id"]
before = get_job(jid)["next_run_at"]
assert claim_job_for_fire(jid) is True
assert claim_job_for_fire(jid) is False
assert get_job(jid)["next_run_at"] != before
def test_claim_oneshot_cannot_be_double_claimed(temp_home):
"""A one-shot can't be double-claimed (the fresh claim blocks the retry)."""
from cron.jobs import create_job, claim_job_for_fire
job = create_job(prompt="x", schedule="30m", name="o")
assert claim_job_for_fire(job["id"]) is True
assert claim_job_for_fire(job["id"]) is False
def test_claim_unknown_job_returns_false(temp_home):
from cron.jobs import claim_job_for_fire
assert claim_job_for_fire("nope-does-not-exist") is False
def test_claim_paused_job_returns_false(temp_home):
"""A paused job can't be claimed."""
from cron.jobs import create_job, claim_job_for_fire, pause_job
job = create_job(prompt="x", schedule="every 5m", name="p")
pause_job(job["id"])
assert claim_job_for_fire(job["id"]) is False
def test_stale_claim_is_reclaimable(temp_home, monkeypatch):
"""A claim older than the TTL is overwritten — the fire isn't stuck forever
if the winning machine crashed before mark_job_run cleared the claim."""
from cron.jobs import create_job, claim_job_for_fire
job = create_job(prompt="x", schedule="every 5m", name="s")
jid = job["id"]
assert claim_job_for_fire(jid) is True
# With a 0s TTL, the existing claim is always considered stale.
assert claim_job_for_fire(jid, claim_ttl_seconds=0) is True
def test_mark_job_run_clears_claim(temp_home):
"""After a recurring job completes, its claim is cleared so the next fire
can be claimed again."""
from cron.jobs import create_job, claim_job_for_fire, mark_job_run, get_job
job = create_job(prompt="x", schedule="every 5m", name="c")
jid = job["id"]
assert claim_job_for_fire(jid) is True
assert get_job(jid).get("fire_claim") is not None
mark_job_run(jid, success=True)
assert get_job(jid).get("fire_claim") is None
# …and the re-armed recurring job is claimable again.
assert claim_job_for_fire(jid) is True