diff --git a/cron/scheduler_provider.py b/cron/scheduler_provider.py index 45243e7749c..50bca6b892b 100644 --- a/cron/scheduler_provider.py +++ b/cron/scheduler_provider.py @@ -71,6 +71,45 @@ class CronScheduler(ABC): resources (queue consumers, HTTP servers).""" return None + # --- Optional hooks for external providers (added Phase 4). -------------- + # All default-safe so the built-in inherits working behavior without + # overriding. Keep these NON-abstract — see test_abc_growth_stays_additive. + + def on_jobs_changed(self) -> None: + """Called after a successful store mutation (create/update/remove/ + pause/resume). External providers reconcile their registry here (e.g. + Chronos re-provisions/cancels the affected one-shot via NAS). + Built-in: no-op (it re-reads jobs.json on every tick).""" + return None + + def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool: + """Run a single job NOW via the shared orchestrator. Called by the + inbound fire webhook when an external scheduler signals a job is due. + + The default claims the job with a store-level compare-and-set + (multi-machine at-most-once), then runs it via the shared + ``run_one_job`` body. Built-in never calls this (it has its own tick + loop); an external provider routes its inbound fire here. + + Returns True if THIS caller claimed and ran the job, False if the claim + was lost (another machine/retry won it) or the job no longer exists. + """ + from cron.jobs import claim_job_for_fire, get_job + from cron.scheduler import run_one_job + + if not claim_job_for_fire(job_id): + return False # another machine already claimed this fire + job = get_job(job_id) + if job is None: + return False # job removed (e.g. repeat-N exhausted) between arm and fire + return run_one_job(job, adapters=adapters, loop=loop) + + def reconcile(self) -> None: + """Converge the external registry toward jobs.json (the desired state): + arm missing one-shots, cancel orphaned ones, re-arm changed times. + Built-in: no-op.""" + return None + def resolve_cron_scheduler() -> "CronScheduler": """Return the active cron scheduler provider. diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 8fdbb305a0f..2b2e159e2a3 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -262,3 +262,73 @@ def test_resolve_available_provider_is_used(monkeypatch): monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Fake()) prov = sp.resolve_cron_scheduler() assert prov.name == "fake" + + +# ── Phase 4B: additive hooks (on_jobs_changed / fire_due / reconcile) ──────── + + +def test_hooks_did_not_change_required_surface(): + """The additive hooks must NOT become abstractmethods — the Phase-1 guard + still holds (required surface is exactly name + start).""" + from cron.scheduler_provider import CronScheduler + + assert set(CronScheduler.__abstractmethods__) == {"name", "start"} + + +def test_builtin_inherits_hook_defaults(): + """The built-in inherits no-op defaults for the new hooks (it never needs + to override them).""" + from cron.scheduler_provider import InProcessCronScheduler + + p = InProcessCronScheduler() + assert p.on_jobs_changed() is None + assert p.reconcile() is None + # built-in does not override fire_due; it simply isn't called for built-in. + assert hasattr(p, "fire_due") + + +def test_fire_due_default_claims_then_runs(monkeypatch): + """The default fire_due claims via the store CAS, fetches the job, and runs + it through the shared run_one_job body.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False) + monkeypatch.setattr(jobs, "get_job", lambda jid: {"id": jid, "name": "t"}) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("j1") is True + assert ran == ["j1"] + + +def test_fire_due_lost_claim_does_not_run(monkeypatch): + """If the CAS claim is lost (another machine/retry won), fire_due returns + False and never runs the job.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: False, raising=False) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("j1") is False + assert ran == [] + + +def test_fire_due_missing_job_does_not_run(monkeypatch): + """If the job vanished between arm and fire (e.g. repeat-N exhausted), + fire_due returns False without running.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False) + monkeypatch.setattr(jobs, "get_job", lambda jid: None) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("gone") is False + assert ran == []