From e6ff41ca9516cbca6470a56b1ab98939dbdb935a Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 13:58:43 +1000 Subject: [PATCH] feat(cron): CronScheduler ABC + InProcessCronScheduler (provider #1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of the pluggable cron-scheduler refactor (Axis B — the trigger). No call-site changes; this phase only makes the abstraction exist + tested in isolation. Task 1.1: cron/scheduler_provider.py — the EXPERIMENTAL CronScheduler ABC. Required surface is name + start; is_available()/stop() carry safe defaults. is_available has a no-network invariant. Docstring marks it experimental until the Chronos provider (Phase 4) validates the shape. Task 1.2: InProcessCronScheduler wraps the historical 60s ticker loop, calling cron.scheduler.tick(sync=False) exactly as the raw ticker does. Uses stop_event.wait(interval) for responsive stop (both raw tickers already do). Tests: ABC-is-abstract, default-is_available, the InProcess loop drives tick and stops, stop() no-op, and test_abc_growth_stays_additive (the forward-compat guard: required abstractmethods must stay exactly {name, start}, so the three Phase-4 hooks land as NON-abstract additions). tick() internals in cron/scheduler.py are byte-unchanged (only new file added). Phase 0 characterization tests still green. Full tests/cron/: 445 passed. --- cron/scheduler_provider.py | 98 +++++++++++++++++++++++++++ tests/cron/test_scheduler_provider.py | 78 +++++++++++++++++++++ 2 files changed, 176 insertions(+) create mode 100644 cron/scheduler_provider.py diff --git a/cron/scheduler_provider.py b/cron/scheduler_provider.py new file mode 100644 index 00000000000..329cf4ae8a6 --- /dev/null +++ b/cron/scheduler_provider.py @@ -0,0 +1,98 @@ +"""CronScheduler provider interface (Axis B — the trigger). + +⚠️ EXPERIMENTAL — this interface is validated by exactly ONE consumer (the +built-in) until an external provider (Chronos, Phase 4) shakes it out. Until +then the module path, method signatures, and start() kwargs MAY change without +a deprecation cycle. Once a second provider validates the shape it becomes +stable. Any growth MUST be additive (new optional method with a default), never +a changed signature on start() or a new abstractmethod. + +A CronScheduler decides *when* a due job fires. It does NOT decide what firing +means: execution + delivery stay in cron.scheduler.run_job / _deliver_result, +shared by all providers. Providers must never reimplement agent construction or +delivery. + +The built-in InProcessCronScheduler runs the historical 60s daemon-thread +ticker. Alternative providers (e.g. Chronos, a NAS-mediated managed-cron +provider for scale-to-zero deployments) live under plugins/cron// and are +selected via the `cron.provider` config key (empty = built-in). +""" +from __future__ import annotations + +import threading +from abc import ABC, abstractmethod +from typing import Any + + +class CronScheduler(ABC): + """Axis-B trigger provider. Decides WHEN a due cron job fires. + + Required surface is intentionally minimal: ``name`` + ``start``. ``stop`` + and ``is_available`` carry safe defaults. The three Phase-4 hooks + (``on_jobs_changed`` / ``fire_due`` / ``reconcile``) are added later as + NON-abstract methods so the built-in keeps satisfying the ABC without + overriding them — see ``test_abc_growth_stays_additive``. + """ + + @property + @abstractmethod + def name(self) -> str: + """Short identifier, e.g. 'builtin', 'chronos'.""" + + def is_available(self) -> bool: + """Whether this provider can run in the current environment. + + MUST NOT make network calls. The built-in is always available; an + external provider checks for configured endpoint/credentials. When a + named provider returns False, the resolver falls back to the built-in. + """ + return True + + @abstractmethod + def start( + self, + stop_event: threading.Event, + *, + adapters: Any = None, + loop: Any = None, + interval: int = 60, + ) -> None: + """Begin firing due jobs. + + For the built-in this BLOCKS in the 60s loop until stop_event is set + (it is run inside a daemon thread by the caller, exactly as today). + An external provider may register a schedule/webhook and return + immediately; in that case it must still honor stop_event for teardown. + """ + + def stop(self) -> None: + """Optional eager teardown hook. Default no-op; setting the stop_event + is the primary stop signal. Override for providers holding external + resources (queue consumers, HTTP servers).""" + return None + + +class InProcessCronScheduler(CronScheduler): + """Default provider: the historical in-process 60s ticker. + + ``start()`` blocks in the tick loop until ``stop_event`` is set, identical + to the pre-refactor ``_start_cron_ticker`` core loop. The caller runs it in + a daemon thread. + """ + + @property + def name(self) -> str: + return "builtin" + + def start(self, stop_event, *, adapters=None, loop=None, interval=60): + import logging + from cron.scheduler import tick as cron_tick + + logger = logging.getLogger("cron.scheduler_provider") + logger.info("In-process cron scheduler started (interval=%ds)", interval) + while not stop_event.is_set(): + try: + cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False) + except Exception as e: + logger.debug("Cron tick error: %s", e) + stop_event.wait(interval) diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 1e94347dfa8..74b3891122c 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -81,3 +81,81 @@ def test_desktop_ticker_calls_tick_then_stops(): assert not t.is_alive(), "desktop ticker did not exit after stop_event was set" assert len(calls) >= 1, "desktop ticker never called tick()" assert calls[0].get("sync") is False + + +# ── Phase 1: CronScheduler ABC + InProcessCronScheduler ────────────────────── + + +def test_cronscheduler_is_abstract(): + """name + start are abstract — the bare ABC can't be instantiated.""" + import pytest + from cron.scheduler_provider import CronScheduler + + with pytest.raises(TypeError): + CronScheduler() + + +def test_cronscheduler_default_is_available_true(): + """is_available defaults to True (no-network) for a minimal subclass.""" + from cron.scheduler_provider import CronScheduler + + class Dummy(CronScheduler): + @property + def name(self): + return "dummy" + + def start(self, stop_event, **kw): + pass + + assert Dummy().is_available() is True + + +def test_abc_growth_stays_additive(): + """Forward-compat guard: the ABC's REQUIRED surface is exactly name+start. + + Any optional hook added later for the external provider + (on_jobs_changed/fire_due/reconcile) must be NON-abstract (carry a default), + so the built-in keeps satisfying the ABC without overriding them. This test + fails loudly if someone makes a future hook abstract (a breaking change that + would force every provider — including the built-in — to implement it). + """ + from cron.scheduler_provider import CronScheduler + + abstract = set(getattr(CronScheduler, "__abstractmethods__", set())) + assert abstract == {"name", "start"}, ( + f"CronScheduler abstractmethods changed to {abstract}; growth must be " + "additive (optional methods with defaults), not new abstract methods." + ) + + +def test_inprocess_provider_ticks_and_stops(): + """The built-in provider drives cron.scheduler.tick(sync=False) on a loop + and exits promptly when stop_event is set — same contract as the raw + ticker characterized above.""" + from cron.scheduler_provider import InProcessCronScheduler + + calls = [] + stop = threading.Event() + prov = InProcessCronScheduler() + assert prov.name == "builtin" + + with patch("cron.scheduler.tick", side_effect=lambda *a, **k: calls.append(k) or 0): + t = threading.Thread( + target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True + ) + t.start() + time.sleep(0.2) + stop.set() + t.join(timeout=5) + + assert not t.is_alive(), "provider did not exit after stop_event was set" + assert len(calls) >= 1, "provider never called tick()" + assert calls[0].get("sync") is False + + +def test_inprocess_provider_stop_is_noop(): + """The default stop() hook is a safe no-op (the stop_event is the real + stop signal for the built-in).""" + from cron.scheduler_provider import InProcessCronScheduler + + assert InProcessCronScheduler().stop() is None