hermes-agent/cron/scheduler_provider.py
Ben 6ff5fd373b feat(cron): additive CronScheduler hooks (on_jobs_changed/fire_due/reconcile)
Phase 4B. Three NON-abstract hooks on the CronScheduler ABC, all with
built-in-safe defaults so the built-in inherits them without overriding and
test_abc_growth_stays_additive stays green (required surface still {name,
start}):

- on_jobs_changed(): post-mutation reconcile hook. Built-in no-op.
- fire_due(job_id): claim the job via the store CAS (claim_job_for_fire,
  Phase 4C) then run it through the shared run_one_job (Phase 4A). Returns
  False if the claim is lost or the job vanished (repeat-N exhausted between
  arm and fire). The inbound webhook (Phase 4E) routes here.
- reconcile(): converge the external registry toward jobs.json. Built-in no-op.

fire_due imports claim_job_for_fire/get_job/run_one_job INSIDE the method, so
this commits cleanly before Phase 4C lands claim_job_for_fire (import-time is
unaffected; tests monkeypatch it with raising=False).

Tests: required-surface-unchanged guard, built-in inherits no-op defaults, and
fire_due's three paths (claim+run, lost-claim→no-run, missing-job→no-run).
tests/cron/ green (20 in test_scheduler_provider.py).
2026-06-18 14:30:31 +10:00

177 lines
7.1 KiB
Python

"""CronScheduler provider interface (Axis B — the trigger).
⚠️ EXPERIMENTAL — this interface is validated by exactly ONE consumer (the
built-in) until an external provider (Chronos, Phase 4) shakes it out. Until
then the module path, method signatures, and start() kwargs MAY change without
a deprecation cycle. Once a second provider validates the shape it becomes
stable. Any growth MUST be additive (new optional method with a default), never
a changed signature on start() or a new abstractmethod.
A CronScheduler decides *when* a due job fires. It does NOT decide what firing
means: execution + delivery stay in cron.scheduler.run_job / _deliver_result,
shared by all providers. Providers must never reimplement agent construction or
delivery.
The built-in InProcessCronScheduler runs the historical 60s daemon-thread
ticker. Alternative providers (e.g. Chronos, a NAS-mediated managed-cron
provider for scale-to-zero deployments) live under plugins/cron/<name>/ and are
selected via the `cron.provider` config key (empty = built-in).
"""
from __future__ import annotations
import threading
from abc import ABC, abstractmethod
from typing import Any
class CronScheduler(ABC):
"""Axis-B trigger provider. Decides WHEN a due cron job fires.
Required surface is intentionally minimal: ``name`` + ``start``. ``stop``
and ``is_available`` carry safe defaults. The three Phase-4 hooks
(``on_jobs_changed`` / ``fire_due`` / ``reconcile``) are added later as
NON-abstract methods so the built-in keeps satisfying the ABC without
overriding them — see ``test_abc_growth_stays_additive``.
"""
@property
@abstractmethod
def name(self) -> str:
"""Short identifier, e.g. 'builtin', 'chronos'."""
def is_available(self) -> bool:
"""Whether this provider can run in the current environment.
MUST NOT make network calls. The built-in is always available; an
external provider checks for configured endpoint/credentials. When a
named provider returns False, the resolver falls back to the built-in.
"""
return True
@abstractmethod
def start(
self,
stop_event: threading.Event,
*,
adapters: Any = None,
loop: Any = None,
interval: int = 60,
) -> None:
"""Begin firing due jobs.
For the built-in this BLOCKS in the 60s loop until stop_event is set
(it is run inside a daemon thread by the caller, exactly as today).
An external provider may register a schedule/webhook and return
immediately; in that case it must still honor stop_event for teardown.
"""
def stop(self) -> None:
"""Optional eager teardown hook. Default no-op; setting the stop_event
is the primary stop signal. Override for providers holding external
resources (queue consumers, HTTP servers)."""
return None
# --- Optional hooks for external providers (added Phase 4). --------------
# All default-safe so the built-in inherits working behavior without
# overriding. Keep these NON-abstract — see test_abc_growth_stays_additive.
def on_jobs_changed(self) -> None:
"""Called after a successful store mutation (create/update/remove/
pause/resume). External providers reconcile their registry here (e.g.
Chronos re-provisions/cancels the affected one-shot via NAS).
Built-in: no-op (it re-reads jobs.json on every tick)."""
return None
def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool:
"""Run a single job NOW via the shared orchestrator. Called by the
inbound fire webhook when an external scheduler signals a job is due.
The default claims the job with a store-level compare-and-set
(multi-machine at-most-once), then runs it via the shared
``run_one_job`` body. Built-in never calls this (it has its own tick
loop); an external provider routes its inbound fire here.
Returns True if THIS caller claimed and ran the job, False if the claim
was lost (another machine/retry won it) or the job no longer exists.
"""
from cron.jobs import claim_job_for_fire, get_job
from cron.scheduler import run_one_job
if not claim_job_for_fire(job_id):
return False # another machine already claimed this fire
job = get_job(job_id)
if job is None:
return False # job removed (e.g. repeat-N exhausted) between arm and fire
return run_one_job(job, adapters=adapters, loop=loop)
def reconcile(self) -> None:
"""Converge the external registry toward jobs.json (the desired state):
arm missing one-shots, cancel orphaned ones, re-arm changed times.
Built-in: no-op."""
return None
def resolve_cron_scheduler() -> "CronScheduler":
"""Return the active cron scheduler provider.
Reads ``cron.provider`` from config. Empty/absent → built-in. A named
provider that is missing, fails to load, or reports ``is_available() ==
False`` falls back to the built-in with a warning — cron must never be left
without a trigger.
"""
import logging
logger = logging.getLogger("cron.scheduler_provider")
name = ""
try:
from hermes_cli.config import cfg_get, load_config
name = (cfg_get(load_config(), "cron", "provider", default="") or "").strip()
except Exception:
pass
if not name or name in ("builtin", "in-process", "inprocess"):
return InProcessCronScheduler()
try:
from plugins.cron import load_cron_scheduler
provider = load_cron_scheduler(name)
if provider is None:
logger.warning("cron.provider '%s' not found; using built-in ticker", name)
return InProcessCronScheduler()
if not provider.is_available():
logger.warning("cron.provider '%s' not available; using built-in ticker", name)
return InProcessCronScheduler()
logger.info("Using cron scheduler provider: %s", provider.name)
return provider
except Exception as e:
logger.warning(
"Failed to load cron.provider '%s' (%s); using built-in ticker", name, e
)
return InProcessCronScheduler()
class InProcessCronScheduler(CronScheduler):
"""Default provider: the historical in-process 60s ticker.
``start()`` blocks in the tick loop until ``stop_event`` is set, identical
to the pre-refactor ``_start_cron_ticker`` core loop. The caller runs it in
a daemon thread.
"""
@property
def name(self) -> str:
return "builtin"
def start(self, stop_event, *, adapters=None, loop=None, interval=60):
import logging
from cron.scheduler import tick as cron_tick
logger = logging.getLogger("cron.scheduler_provider")
logger.info("In-process cron scheduler started (interval=%ds)", interval)
while not stop_event.is_set():
try:
cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False)
except Exception as e:
logger.debug("Cron tick error: %s", e)
stop_event.wait(interval)