From 6ff5fd373b6695b1ed7b7e0f63fde6a8430d16e6 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:30:31 +1000 Subject: [PATCH] feat(cron): additive CronScheduler hooks (on_jobs_changed/fire_due/reconcile) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4B. Three NON-abstract hooks on the CronScheduler ABC, all with built-in-safe defaults so the built-in inherits them without overriding and test_abc_growth_stays_additive stays green (required surface still {name, start}): - on_jobs_changed(): post-mutation reconcile hook. Built-in no-op. - fire_due(job_id): claim the job via the store CAS (claim_job_for_fire, Phase 4C) then run it through the shared run_one_job (Phase 4A). Returns False if the claim is lost or the job vanished (repeat-N exhausted between arm and fire). The inbound webhook (Phase 4E) routes here. - reconcile(): converge the external registry toward jobs.json. Built-in no-op. fire_due imports claim_job_for_fire/get_job/run_one_job INSIDE the method, so this commits cleanly before Phase 4C lands claim_job_for_fire (import-time is unaffected; tests monkeypatch it with raising=False). Tests: required-surface-unchanged guard, built-in inherits no-op defaults, and fire_due's three paths (claim+run, lost-claim→no-run, missing-job→no-run). tests/cron/ green (20 in test_scheduler_provider.py). --- cron/scheduler_provider.py | 39 +++++++++++++++ tests/cron/test_scheduler_provider.py | 70 +++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/cron/scheduler_provider.py b/cron/scheduler_provider.py index 45243e7749c..50bca6b892b 100644 --- a/cron/scheduler_provider.py +++ b/cron/scheduler_provider.py @@ -71,6 +71,45 @@ class CronScheduler(ABC): resources (queue consumers, HTTP servers).""" return None + # --- Optional hooks for external providers (added Phase 4). -------------- + # All default-safe so the built-in inherits working behavior without + # overriding. Keep these NON-abstract — see test_abc_growth_stays_additive. + + def on_jobs_changed(self) -> None: + """Called after a successful store mutation (create/update/remove/ + pause/resume). External providers reconcile their registry here (e.g. + Chronos re-provisions/cancels the affected one-shot via NAS). + Built-in: no-op (it re-reads jobs.json on every tick).""" + return None + + def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool: + """Run a single job NOW via the shared orchestrator. Called by the + inbound fire webhook when an external scheduler signals a job is due. + + The default claims the job with a store-level compare-and-set + (multi-machine at-most-once), then runs it via the shared + ``run_one_job`` body. Built-in never calls this (it has its own tick + loop); an external provider routes its inbound fire here. + + Returns True if THIS caller claimed and ran the job, False if the claim + was lost (another machine/retry won it) or the job no longer exists. + """ + from cron.jobs import claim_job_for_fire, get_job + from cron.scheduler import run_one_job + + if not claim_job_for_fire(job_id): + return False # another machine already claimed this fire + job = get_job(job_id) + if job is None: + return False # job removed (e.g. repeat-N exhausted) between arm and fire + return run_one_job(job, adapters=adapters, loop=loop) + + def reconcile(self) -> None: + """Converge the external registry toward jobs.json (the desired state): + arm missing one-shots, cancel orphaned ones, re-arm changed times. + Built-in: no-op.""" + return None + def resolve_cron_scheduler() -> "CronScheduler": """Return the active cron scheduler provider. diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 8fdbb305a0f..2b2e159e2a3 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -262,3 +262,73 @@ def test_resolve_available_provider_is_used(monkeypatch): monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Fake()) prov = sp.resolve_cron_scheduler() assert prov.name == "fake" + + +# ── Phase 4B: additive hooks (on_jobs_changed / fire_due / reconcile) ──────── + + +def test_hooks_did_not_change_required_surface(): + """The additive hooks must NOT become abstractmethods — the Phase-1 guard + still holds (required surface is exactly name + start).""" + from cron.scheduler_provider import CronScheduler + + assert set(CronScheduler.__abstractmethods__) == {"name", "start"} + + +def test_builtin_inherits_hook_defaults(): + """The built-in inherits no-op defaults for the new hooks (it never needs + to override them).""" + from cron.scheduler_provider import InProcessCronScheduler + + p = InProcessCronScheduler() + assert p.on_jobs_changed() is None + assert p.reconcile() is None + # built-in does not override fire_due; it simply isn't called for built-in. + assert hasattr(p, "fire_due") + + +def test_fire_due_default_claims_then_runs(monkeypatch): + """The default fire_due claims via the store CAS, fetches the job, and runs + it through the shared run_one_job body.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False) + monkeypatch.setattr(jobs, "get_job", lambda jid: {"id": jid, "name": "t"}) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("j1") is True + assert ran == ["j1"] + + +def test_fire_due_lost_claim_does_not_run(monkeypatch): + """If the CAS claim is lost (another machine/retry won), fire_due returns + False and never runs the job.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: False, raising=False) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("j1") is False + assert ran == [] + + +def test_fire_due_missing_job_does_not_run(monkeypatch): + """If the job vanished between arm and fire (e.g. repeat-N exhausted), + fire_due returns False without running.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False) + monkeypatch.setattr(jobs, "get_job", lambda jid: None) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("gone") is False + assert ran == []