feat(cron): additive CronScheduler hooks (on_jobs_changed/fire_due/reconcile)

Phase 4B. Three NON-abstract hooks on the CronScheduler ABC, all with
built-in-safe defaults so the built-in inherits them without overriding and
test_abc_growth_stays_additive stays green (required surface still {name,
start}):

- on_jobs_changed(): post-mutation reconcile hook. Built-in no-op.
- fire_due(job_id): claim the job via the store CAS (claim_job_for_fire,
  Phase 4C) then run it through the shared run_one_job (Phase 4A). Returns
  False if the claim is lost or the job vanished (repeat-N exhausted between
  arm and fire). The inbound webhook (Phase 4E) routes here.
- reconcile(): converge the external registry toward jobs.json. Built-in no-op.

fire_due imports claim_job_for_fire/get_job/run_one_job INSIDE the method, so
this commits cleanly before Phase 4C lands claim_job_for_fire (import-time is
unaffected; tests monkeypatch it with raising=False).

Tests: required-surface-unchanged guard, built-in inherits no-op defaults, and
fire_due's three paths (claim+run, lost-claim→no-run, missing-job→no-run).
tests/cron/ green (20 in test_scheduler_provider.py).
This commit is contained in:
Ben 2026-06-18 14:30:31 +10:00
parent 58b19a4f69
commit 6ff5fd373b
2 changed files with 109 additions and 0 deletions

View file

@ -71,6 +71,45 @@ class CronScheduler(ABC):
resources (queue consumers, HTTP servers)."""
return None
# --- Optional hooks for external providers (added Phase 4). --------------
# All default-safe so the built-in inherits working behavior without
# overriding. Keep these NON-abstract — see test_abc_growth_stays_additive.
def on_jobs_changed(self) -> None:
"""Called after a successful store mutation (create/update/remove/
pause/resume). External providers reconcile their registry here (e.g.
Chronos re-provisions/cancels the affected one-shot via NAS).
Built-in: no-op (it re-reads jobs.json on every tick)."""
return None
def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool:
"""Run a single job NOW via the shared orchestrator. Called by the
inbound fire webhook when an external scheduler signals a job is due.
The default claims the job with a store-level compare-and-set
(multi-machine at-most-once), then runs it via the shared
``run_one_job`` body. Built-in never calls this (it has its own tick
loop); an external provider routes its inbound fire here.
Returns True if THIS caller claimed and ran the job, False if the claim
was lost (another machine/retry won it) or the job no longer exists.
"""
from cron.jobs import claim_job_for_fire, get_job
from cron.scheduler import run_one_job
if not claim_job_for_fire(job_id):
return False # another machine already claimed this fire
job = get_job(job_id)
if job is None:
return False # job removed (e.g. repeat-N exhausted) between arm and fire
return run_one_job(job, adapters=adapters, loop=loop)
def reconcile(self) -> None:
"""Converge the external registry toward jobs.json (the desired state):
arm missing one-shots, cancel orphaned ones, re-arm changed times.
Built-in: no-op."""
return None
def resolve_cron_scheduler() -> "CronScheduler":
"""Return the active cron scheduler provider.

View file

@ -262,3 +262,73 @@ def test_resolve_available_provider_is_used(monkeypatch):
monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Fake())
prov = sp.resolve_cron_scheduler()
assert prov.name == "fake"
# ── Phase 4B: additive hooks (on_jobs_changed / fire_due / reconcile) ────────
def test_hooks_did_not_change_required_surface():
"""The additive hooks must NOT become abstractmethods — the Phase-1 guard
still holds (required surface is exactly name + start)."""
from cron.scheduler_provider import CronScheduler
assert set(CronScheduler.__abstractmethods__) == {"name", "start"}
def test_builtin_inherits_hook_defaults():
"""The built-in inherits no-op defaults for the new hooks (it never needs
to override them)."""
from cron.scheduler_provider import InProcessCronScheduler
p = InProcessCronScheduler()
assert p.on_jobs_changed() is None
assert p.reconcile() is None
# built-in does not override fire_due; it simply isn't called for built-in.
assert hasattr(p, "fire_due")
def test_fire_due_default_claims_then_runs(monkeypatch):
"""The default fire_due claims via the store CAS, fetches the job, and runs
it through the shared run_one_job body."""
import cron.jobs as jobs
import cron.scheduler as sched
from cron.scheduler_provider import InProcessCronScheduler
ran = []
monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False)
monkeypatch.setattr(jobs, "get_job", lambda jid: {"id": jid, "name": "t"})
monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True)
assert InProcessCronScheduler().fire_due("j1") is True
assert ran == ["j1"]
def test_fire_due_lost_claim_does_not_run(monkeypatch):
"""If the CAS claim is lost (another machine/retry won), fire_due returns
False and never runs the job."""
import cron.jobs as jobs
import cron.scheduler as sched
from cron.scheduler_provider import InProcessCronScheduler
ran = []
monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: False, raising=False)
monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True)
assert InProcessCronScheduler().fire_due("j1") is False
assert ran == []
def test_fire_due_missing_job_does_not_run(monkeypatch):
"""If the job vanished between arm and fire (e.g. repeat-N exhausted),
fire_due returns False without running."""
import cron.jobs as jobs
import cron.scheduler as sched
from cron.scheduler_provider import InProcessCronScheduler
ran = []
monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False)
monkeypatch.setattr(jobs, "get_job", lambda jid: None)
monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True)
assert InProcessCronScheduler().fire_due("gone") is False
assert ran == []