From b01eee0c77e182f1c6f9d101c5851fbe4b5efae3 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:34:34 +1000 Subject: [PATCH] feat(cron): store-level CAS claim for multi-machine at-most-once fire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4C. claim_job_for_fire(job_id, *, claim_ttl_seconds=300) in cron/jobs.py: under the existing _jobs_lock() file lock, claim a job for a single external fire so that across N gateway replicas exactly ONE wins. Single-machine deployments always win (unaffected). Semantics: - missing / disabled / paused job → False. - a fresh fire_claim (younger than claim_ttl_seconds) already present → False (someone else holds it). Stale claim (crashed winner) → overwrite, so a job is never wedged forever. - on win: stamp fire_claim={at, by:_machine_id()}; for recurring (cron/interval) advance next_run_at (mirrors advance_next_run's at-most-once bump so a stale re-delivery can't re-fire); one-shots keep next_run_at but the fresh claim blocks a duplicate retry for the same fire. - mark_job_run now clears fire_claim on completion so a re-armed recurring job is claimable again next fire. _machine_id() (HERMES_MACHINE_ID env, else hostname:pid) is attribution-only; correctness is the file lock + fresh-claim check, not the id. This is consumed by CronScheduler.fire_due (Phase 4B). tick is untouched — it still uses advance_next_run, so the built-in single-machine path is unaffected. Tests (real store, temp HERMES_HOME): claim-once-then-block + next_run advance, one-shot no-double-claim, unknown→False, paused→False, stale-claim reclaimable, mark_job_run clears the claim (recurring re-claimable). tests/cron/ 470 passed. --- cron/jobs.py | 68 ++++++++++++++++++++++ tests/cron/test_claim_job_for_fire.py | 84 +++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 tests/cron/test_claim_job_for_fire.py diff --git a/cron/jobs.py b/cron/jobs.py index 178bd0fad81..2f44608d649 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -976,6 +976,9 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, job["last_error"] = error if not success else None # Track delivery failures separately — cleared on successful delivery job["last_delivery_error"] = delivery_error + # Clear any external-fire claim so a re-armed recurring job can + # be claimed again on its next fire (Phase 4C CAS). + job["fire_claim"] = None # Increment completed count if job.get("repeat"): @@ -1057,6 +1060,71 @@ def advance_next_run(job_id: str) -> bool: return False +def _machine_id() -> str: + """Stable-ish identifier for claim attribution/debugging (NOT correctness). + + Uses ``HERMES_MACHINE_ID`` if set, else hostname + pid. The CAS correctness + comes from the file lock + the fresh-claim check, not from this value. + """ + explicit = os.getenv("HERMES_MACHINE_ID", "").strip() + if explicit: + return explicit + try: + import socket + host = socket.gethostname() + except Exception: + host = "unknown" + return f"{host}:{os.getpid()}" + + +def claim_job_for_fire(job_id: str, *, claim_ttl_seconds: int = 300) -> bool: + """Atomically claim a job for a single external 'fire' (multi-machine + at-most-once). Returns True iff THIS caller won the claim. + + Used by the external-provider fire path (``CronScheduler.fire_due``) when an + external scheduler (Chronos) signals a job is due across N gateway replicas: + exactly one wins. Single-machine deployments always win. + + Under the file lock: reject if the job is missing/disabled/paused. If a + fresh claim (younger than ``claim_ttl_seconds``) already exists, lose. + Otherwise stamp a ``fire_claim`` and, for recurring jobs, advance + ``next_run_at`` (mirrors ``advance_next_run``'s at-most-once bump so a stale + re-delivery for the old time can't re-fire). One-shots keep ``next_run_at`` + but the fresh ``fire_claim`` blocks a duplicate retry for the same fire. + ``mark_job_run`` clears the claim on completion so a re-armed recurring job + is claimable again next fire. + + The stale-claim TTL means a machine that crashed after claiming but before + completing doesn't wedge the job forever — after the TTL another fire can + reclaim it. + """ + with _jobs_lock(): + jobs = load_jobs() + for job in jobs: + if job["id"] != job_id: + continue + if not job.get("enabled", True) or job.get("state") == "paused": + return False + now = _hermes_now() + existing = job.get("fire_claim") + if existing: + try: + claimed_at = _ensure_aware(datetime.fromisoformat(existing["at"])) + if (now - claimed_at).total_seconds() < claim_ttl_seconds: + return False # someone holds a fresh claim + except Exception: + pass # malformed claim → overwrite + job["fire_claim"] = {"at": now.isoformat(), "by": _machine_id()} + kind = job.get("schedule", {}).get("kind") + if kind in {"cron", "interval"}: + nxt = compute_next_run(job["schedule"], now.isoformat()) + if nxt: + job["next_run_at"] = nxt + save_jobs(jobs) + return True + return False + + def get_due_jobs() -> List[Dict[str, Any]]: """Get all jobs that are due to run now. diff --git a/tests/cron/test_claim_job_for_fire.py b/tests/cron/test_claim_job_for_fire.py new file mode 100644 index 00000000000..abbe969eb04 --- /dev/null +++ b/tests/cron/test_claim_job_for_fire.py @@ -0,0 +1,84 @@ +"""Tests for the store-level CAS fire claim (Phase 4C). + +`claim_job_for_fire` gives multi-machine at-most-once semantics when an external +scheduler (Chronos) fires a job: across N gateway replicas, exactly ONE wins the +claim for a given fire. Single-machine deployments always win (unaffected). + +These exercise the real store against a temp HERMES_HOME (no mocks) per the +E2E-over-mocks discipline for file-touching code. +""" +import pytest + + +@pytest.fixture +def temp_home(tmp_path, monkeypatch): + """Isolated HERMES_HOME so jobs.json doesn't touch the real store.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # cron.jobs caches no home at import; get_hermes_home() reads the env live. + yield tmp_path + + +def test_claim_succeeds_once_then_blocks(temp_home): + """First claim for a fire wins; a second claim for the same fire loses, and + next_run_at is advanced (a re-delivery for the old time can't re-fire).""" + from cron.jobs import create_job, claim_job_for_fire, get_job + + job = create_job(prompt="x", schedule="every 5m", name="t") + jid = job["id"] + before = get_job(jid)["next_run_at"] + + assert claim_job_for_fire(jid) is True + assert claim_job_for_fire(jid) is False + assert get_job(jid)["next_run_at"] != before + + +def test_claim_oneshot_cannot_be_double_claimed(temp_home): + """A one-shot can't be double-claimed (the fresh claim blocks the retry).""" + from cron.jobs import create_job, claim_job_for_fire + + job = create_job(prompt="x", schedule="30m", name="o") + assert claim_job_for_fire(job["id"]) is True + assert claim_job_for_fire(job["id"]) is False + + +def test_claim_unknown_job_returns_false(temp_home): + from cron.jobs import claim_job_for_fire + + assert claim_job_for_fire("nope-does-not-exist") is False + + +def test_claim_paused_job_returns_false(temp_home): + """A paused job can't be claimed.""" + from cron.jobs import create_job, claim_job_for_fire, pause_job + + job = create_job(prompt="x", schedule="every 5m", name="p") + pause_job(job["id"]) + assert claim_job_for_fire(job["id"]) is False + + +def test_stale_claim_is_reclaimable(temp_home, monkeypatch): + """A claim older than the TTL is overwritten — the fire isn't stuck forever + if the winning machine crashed before mark_job_run cleared the claim.""" + from cron.jobs import create_job, claim_job_for_fire + + job = create_job(prompt="x", schedule="every 5m", name="s") + jid = job["id"] + assert claim_job_for_fire(jid) is True + # With a 0s TTL, the existing claim is always considered stale. + assert claim_job_for_fire(jid, claim_ttl_seconds=0) is True + + +def test_mark_job_run_clears_claim(temp_home): + """After a recurring job completes, its claim is cleared so the next fire + can be claimed again.""" + from cron.jobs import create_job, claim_job_for_fire, mark_job_run, get_job + + job = create_job(prompt="x", schedule="every 5m", name="c") + jid = job["id"] + assert claim_job_for_fire(jid) is True + assert get_job(jid).get("fire_claim") is not None + + mark_job_run(jid, success=True) + assert get_job(jid).get("fire_claim") is None + # …and the re-armed recurring job is claimable again. + assert claim_job_for_fire(jid) is True