feat(cron): CronScheduler ABC + InProcessCronScheduler (provider #1)

Phase 1 of the pluggable cron-scheduler refactor (Axis B — the trigger).
No call-site changes; this phase only makes the abstraction exist + tested
in isolation.

Task 1.1: cron/scheduler_provider.py — the EXPERIMENTAL CronScheduler ABC.
  Required surface is name + start; is_available()/stop() carry safe defaults.
  is_available has a no-network invariant. Docstring marks it experimental
  until the Chronos provider (Phase 4) validates the shape.
Task 1.2: InProcessCronScheduler wraps the historical 60s ticker loop, calling
  cron.scheduler.tick(sync=False) exactly as the raw ticker does. Uses
  stop_event.wait(interval) for responsive stop (both raw tickers already do).

Tests: ABC-is-abstract, default-is_available, the InProcess loop drives tick
and stops, stop() no-op, and test_abc_growth_stays_additive (the forward-compat
guard: required abstractmethods must stay exactly {name, start}, so the three
Phase-4 hooks land as NON-abstract additions).

tick() internals in cron/scheduler.py are byte-unchanged (only new file added).
Phase 0 characterization tests still green. Full tests/cron/: 445 passed.
This commit is contained in:
Ben 2026-06-18 13:58:43 +10:00
parent a657397769
commit e6ff41ca95
2 changed files with 176 additions and 0 deletions

View file

@ -0,0 +1,98 @@
"""CronScheduler provider interface (Axis B — the trigger).
EXPERIMENTAL this interface is validated by exactly ONE consumer (the
built-in) until an external provider (Chronos, Phase 4) shakes it out. Until
then the module path, method signatures, and start() kwargs MAY change without
a deprecation cycle. Once a second provider validates the shape it becomes
stable. Any growth MUST be additive (new optional method with a default), never
a changed signature on start() or a new abstractmethod.
A CronScheduler decides *when* a due job fires. It does NOT decide what firing
means: execution + delivery stay in cron.scheduler.run_job / _deliver_result,
shared by all providers. Providers must never reimplement agent construction or
delivery.
The built-in InProcessCronScheduler runs the historical 60s daemon-thread
ticker. Alternative providers (e.g. Chronos, a NAS-mediated managed-cron
provider for scale-to-zero deployments) live under plugins/cron/<name>/ and are
selected via the `cron.provider` config key (empty = built-in).
"""
from __future__ import annotations
import threading
from abc import ABC, abstractmethod
from typing import Any
class CronScheduler(ABC):
"""Axis-B trigger provider. Decides WHEN a due cron job fires.
Required surface is intentionally minimal: ``name`` + ``start``. ``stop``
and ``is_available`` carry safe defaults. The three Phase-4 hooks
(``on_jobs_changed`` / ``fire_due`` / ``reconcile``) are added later as
NON-abstract methods so the built-in keeps satisfying the ABC without
overriding them see ``test_abc_growth_stays_additive``.
"""
@property
@abstractmethod
def name(self) -> str:
"""Short identifier, e.g. 'builtin', 'chronos'."""
def is_available(self) -> bool:
"""Whether this provider can run in the current environment.
MUST NOT make network calls. The built-in is always available; an
external provider checks for configured endpoint/credentials. When a
named provider returns False, the resolver falls back to the built-in.
"""
return True
@abstractmethod
def start(
self,
stop_event: threading.Event,
*,
adapters: Any = None,
loop: Any = None,
interval: int = 60,
) -> None:
"""Begin firing due jobs.
For the built-in this BLOCKS in the 60s loop until stop_event is set
(it is run inside a daemon thread by the caller, exactly as today).
An external provider may register a schedule/webhook and return
immediately; in that case it must still honor stop_event for teardown.
"""
def stop(self) -> None:
"""Optional eager teardown hook. Default no-op; setting the stop_event
is the primary stop signal. Override for providers holding external
resources (queue consumers, HTTP servers)."""
return None
class InProcessCronScheduler(CronScheduler):
"""Default provider: the historical in-process 60s ticker.
``start()`` blocks in the tick loop until ``stop_event`` is set, identical
to the pre-refactor ``_start_cron_ticker`` core loop. The caller runs it in
a daemon thread.
"""
@property
def name(self) -> str:
return "builtin"
def start(self, stop_event, *, adapters=None, loop=None, interval=60):
import logging
from cron.scheduler import tick as cron_tick
logger = logging.getLogger("cron.scheduler_provider")
logger.info("In-process cron scheduler started (interval=%ds)", interval)
while not stop_event.is_set():
try:
cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False)
except Exception as e:
logger.debug("Cron tick error: %s", e)
stop_event.wait(interval)

View file

@ -81,3 +81,81 @@ def test_desktop_ticker_calls_tick_then_stops():
assert not t.is_alive(), "desktop ticker did not exit after stop_event was set"
assert len(calls) >= 1, "desktop ticker never called tick()"
assert calls[0].get("sync") is False
# ── Phase 1: CronScheduler ABC + InProcessCronScheduler ──────────────────────
def test_cronscheduler_is_abstract():
"""name + start are abstract — the bare ABC can't be instantiated."""
import pytest
from cron.scheduler_provider import CronScheduler
with pytest.raises(TypeError):
CronScheduler()
def test_cronscheduler_default_is_available_true():
"""is_available defaults to True (no-network) for a minimal subclass."""
from cron.scheduler_provider import CronScheduler
class Dummy(CronScheduler):
@property
def name(self):
return "dummy"
def start(self, stop_event, **kw):
pass
assert Dummy().is_available() is True
def test_abc_growth_stays_additive():
"""Forward-compat guard: the ABC's REQUIRED surface is exactly name+start.
Any optional hook added later for the external provider
(on_jobs_changed/fire_due/reconcile) must be NON-abstract (carry a default),
so the built-in keeps satisfying the ABC without overriding them. This test
fails loudly if someone makes a future hook abstract (a breaking change that
would force every provider including the built-in to implement it).
"""
from cron.scheduler_provider import CronScheduler
abstract = set(getattr(CronScheduler, "__abstractmethods__", set()))
assert abstract == {"name", "start"}, (
f"CronScheduler abstractmethods changed to {abstract}; growth must be "
"additive (optional methods with defaults), not new abstract methods."
)
def test_inprocess_provider_ticks_and_stops():
"""The built-in provider drives cron.scheduler.tick(sync=False) on a loop
and exits promptly when stop_event is set same contract as the raw
ticker characterized above."""
from cron.scheduler_provider import InProcessCronScheduler
calls = []
stop = threading.Event()
prov = InProcessCronScheduler()
assert prov.name == "builtin"
with patch("cron.scheduler.tick", side_effect=lambda *a, **k: calls.append(k) or 0):
t = threading.Thread(
target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True
)
t.start()
time.sleep(0.2)
stop.set()
t.join(timeout=5)
assert not t.is_alive(), "provider did not exit after stop_event was set"
assert len(calls) >= 1, "provider never called tick()"
assert calls[0].get("sync") is False
def test_inprocess_provider_stop_is_noop():
"""The default stop() hook is a safe no-op (the stop_event is the real
stop signal for the built-in)."""
from cron.scheduler_provider import InProcessCronScheduler
assert InProcessCronScheduler().stop() is None