feat(cron): Chronos NAS-mediated managed-cron provider (scale-to-zero)

Phase 4D. The first non-default CronScheduler: plugins/cron/chronos/. Inert
unless cron.provider=chronos; resolve_cron_scheduler falls back to the built-in
if unavailable, so cron never loses its trigger.

Files:
- chronos/__init__.py — ChronosCronScheduler + register(ctx).
  * is_available(): config-only, NO network (portal_url + callback_url + a
    stored Nous access token via get_provider_auth_state). Returns False →
    resolver falls back to built-in.
  * start(): reconcile() then RETURN — no blocking loop, no 60s wake (DQ-1:
    this is what makes scale-to-zero real; the machine wakes only on a
    NAS→agent fire).
  * _arm_one_shot(job): POST NAS provision {job_id, fire_at, agent_callback_url,
    dedup_key=job_id:fire_at}. Agent owns the time → sub-minute fires survive
    (no scheduler 1-minute floor).
  * reconcile(): converge NAS arms toward jobs.json — arm missing/changed-time,
    cancel orphaned, skip paused. Cold process rebuilds from jobs.json +
    idempotent dedup_key.
  * on_jobs_changed(): reconcile (re-arm/cancel the affected one-shot).
  * fire_due(): ABC default (CAS claim + run_one_job) THEN re-arm the next
    one-shot. Job gone (one-shot done / repeat-N exhausted) → no re-arm.
- chronos/_nas_client.py — thin HTTP wrapper for provision/cancel/list using
  the agent's existing refresh-aware Nous token (resolve_nous_access_token).
  Names no scheduler vendor; holds no scheduler creds.
- chronos/plugin.yaml — discovery metadata.

INVARIANT: zero "qstash"/"upstash" hits in plugins/cron, gateway, hermes_cli,
website/docs — the external scheduler is a NAS-internal detail, never named
agent-side.

Tests (13, all NAS mocked, zero network): is_available off-without-config +
on-with-config + makes-no-network; arm payload incl. sub-minute + noop without
next_run; reconcile arms-all / cancels-orphan / skips-paused / skips-already-
armed; fire_due re-arms next / no re-arm when job gone / no re-arm when claim
lost.
This commit is contained in:
Ben 2026-06-18 14:40:56 +10:00
parent b01eee0c77
commit 4c8bbe6416
4 changed files with 576 additions and 0 deletions

View file

@ -0,0 +1,241 @@
"""Chronos — NAS-mediated managed cron provider (scale-to-zero).
Chronos (the Greek god of time, alongside Hermes) is the first non-default
``CronScheduler``. It lets a hosted gateway scale to zero while idle and still
fire cron jobs: instead of a 60s in-process ticker, it asks NAS to arm exactly
one external one-shot per job at that job's real next-fire time. NAS calls the
agent back at fire time over an authenticated webhook (``/api/cron/fire``); the
agent runs the job via the shared ``run_one_job`` body and re-arms the next
one-shot.
The external scheduler NAS uses is an internal NAS implementation detail
Chronos names no vendor, holds no scheduler credentials, and speaks only to
NAS's ``agent-cron`` endpoints with the agent's existing Nous token.
Design constraints (see the plan's DQ-1):
- start() arms all enabled jobs and RETURNS; it never blocks and never spawns
a periodic wake. Between fires the machine is truly at zero.
- reconcile runs only on a warm process (start / on_jobs_changed / piggybacked
on a fire), never as a periodic wake of a sleeping machine.
Inert unless ``cron.provider: chronos``. ``resolve_cron_scheduler`` falls back
to the built-in if Chronos is unavailable, so cron never loses its trigger.
Wire contract: ``docs/chronos-managed-cron-contract.md``.
"""
from __future__ import annotations
import logging
import threading
from typing import Any, Dict, Optional
from cron.scheduler_provider import CronScheduler
logger = logging.getLogger("cron.chronos")
def _cfg(*keys: str, default: Any = "") -> Any:
"""Read a cron.chronos.* config value (no network)."""
try:
from hermes_cli.config import cfg_get, load_config
return cfg_get(load_config(), *keys, default=default)
except Exception:
return default
class ChronosCronScheduler(CronScheduler):
"""NAS-mediated external cron provider."""
def __init__(self) -> None:
# In-memory map of job_id → fire_at we've asked NAS to arm. Best-effort
# cache; reconcile rebuilds desired state from jobs.json, so a cold
# process simply re-arms (idempotent via dedup_key).
self._armed: Dict[str, str] = {}
self._lock = threading.Lock()
self._client = None # lazily constructed (no network in is_available)
# -- identity / availability -----------------------------------------
@property
def name(self) -> str:
return "chronos"
def is_available(self) -> bool:
"""Config presence only — NO network.
Chronos needs a portal base URL, the agent's own publicly-reachable
callback URL (for NASagent fires), and a usable Nous token (the agent
is logged into the portal). If any is missing, resolve_cron_scheduler
falls back to the built-in ticker.
"""
if not (_cfg("cron", "chronos", "portal_url") and _cfg("cron", "chronos", "callback_url")):
return False
return self._have_nous_token()
def _have_nous_token(self) -> bool:
"""True if the agent has a Nous Portal login (no network call).
Checks the stored auth state for a Nous access token does NOT refresh
or hit the network (is_available must stay offline). The actual
refresh-aware token is resolved lazily at provision time.
"""
try:
from hermes_cli.auth import get_provider_auth_state
state = get_provider_auth_state("nous") or {}
return bool(state.get("access_token"))
except Exception:
return False
# -- client -----------------------------------------------------------
def _get_client(self):
if self._client is None:
from ._nas_client import NasCronClient
self._client = NasCronClient(_cfg("cron", "chronos", "portal_url"))
return self._client
def _callback_url(self) -> str:
return str(_cfg("cron", "chronos", "callback_url") or "")
# -- lifecycle --------------------------------------------------------
def start(self, stop_event, *, adapters=None, loop=None, interval=60):
"""Arm all enabled jobs via NAS, then RETURN immediately.
Does NOT block and does NOT spawn a 60s wake (DQ-1) that is the whole
point of scale-to-zero. The machine wakes only on a NASagent fire.
"""
try:
self.reconcile()
except Exception as e:
logger.warning("Chronos start() reconcile failed: %s", e)
# Intentionally return — no loop, no periodic wake.
def stop(self) -> None:
return None
def on_jobs_changed(self) -> None:
"""A job was created/updated/removed/paused/resumed — reconcile the NAS
registry so the affected one-shot is (re-)armed or cancelled."""
try:
self.reconcile()
except Exception as e:
logger.debug("Chronos on_jobs_changed reconcile failed: %s", e)
# -- arming -----------------------------------------------------------
def _arm_one_shot(self, job: Dict[str, Any]) -> None:
"""Ask NAS to arm exactly one one-shot at the job's next_run_at.
The agent computes the time; NAS+its scheduler are the dumb executor.
Idempotent per (job_id, fire_at) via dedup_key, so re-arming the same
fire is a no-op NAS-side.
"""
job_id = job["id"]
fire_at = job.get("next_run_at")
if not fire_at:
return
dedup_key = f"{job_id}:{fire_at}"
self._get_client().provision(
job_id=job_id,
fire_at=fire_at,
agent_callback_url=self._callback_url(),
dedup_key=dedup_key,
)
with self._lock:
self._armed[job_id] = fire_at
def _cancel(self, job_id: str) -> None:
try:
self._get_client().cancel(job_id=job_id)
finally:
with self._lock:
self._armed.pop(job_id, None)
def _list_armed(self) -> Dict[str, str]:
"""Observed armed one-shots: job_id → fire_at.
Prefer the in-memory map (warm process); on a cold/empty map, ask NAS
(best-effort). If NAS list fails, return what we have reconcile then
re-arms desired jobs idempotently.
"""
with self._lock:
if self._armed:
return dict(self._armed)
try:
observed = {
item["job_id"]: item.get("fire_at", "")
for item in self._get_client().list_armed()
if item.get("job_id")
}
with self._lock:
self._armed.update(observed)
return observed
except Exception as e:
logger.debug("Chronos _list_armed failed (will re-arm idempotently): %s", e)
return {}
# -- reconcile --------------------------------------------------------
def reconcile(self) -> None:
"""Converge the NAS-armed one-shots toward jobs.json (desired state):
arm missing / re-arm changed-time, cancel orphaned."""
from cron.jobs import load_jobs
desired: Dict[str, str] = {
j["id"]: j["next_run_at"]
for j in load_jobs()
if j.get("enabled") and j.get("next_run_at") and j.get("state") != "paused"
}
observed = self._list_armed()
# Arm missing or changed-time.
for job_id, fire_at in desired.items():
if observed.get(job_id) != fire_at:
# Re-fetch the full job dict to arm (need the whole record).
from cron.jobs import get_job
job = get_job(job_id)
if job:
try:
self._arm_one_shot(job)
except Exception as e:
logger.warning("Chronos failed to arm job %s: %s", job_id, e)
# Cancel orphans (armed but no longer desired).
for job_id in list(observed.keys()):
if job_id not in desired:
try:
self._cancel(job_id)
except Exception as e:
logger.warning("Chronos failed to cancel orphan %s: %s", job_id, e)
# -- fire -------------------------------------------------------------
def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool:
"""Run the due job (claim + run_one_job via the ABC default), then
re-arm the NEXT one-shot through NAS.
Re-arm happens AFTER the run so next_run_at reflects the completed fire.
If the job is gone (one-shot completed / repeat-N exhausted), get_job
returns None nothing to re-arm (the schedule naturally stops).
"""
ran = super().fire_due(job_id, adapters=adapters, loop=loop)
if ran:
from cron.jobs import get_job
job = get_job(job_id)
if job and job.get("enabled") and job.get("next_run_at"):
try:
self._arm_one_shot(job)
except Exception as e:
logger.warning("Chronos failed to re-arm job %s after fire: %s", job_id, e)
return ran
def register(ctx) -> None:
"""Plugin entrypoint — register the Chronos provider with the loader.
Mirrors the memory-plugin shape; plugins/cron discovery calls this and
collects the provider via register_cron_scheduler.
"""
ctx.register_cron_scheduler(ChronosCronScheduler())

View file

@ -0,0 +1,123 @@
"""Thin HTTP client for the agent → NAS ``agent-cron`` endpoints (Chronos).
The Chronos provider speaks ONLY to NAS it names no scheduler vendor and
holds no scheduler credentials. NAS owns the external scheduler (an internal
implementation detail) and that scheduler's account; the agent just asks NAS to
"arm a one-shot at time T" / "cancel" / "list", authenticated with the agent's
existing Nous Portal access token (the same token it already uses to call the
portal no new secret).
Wire contract: ``docs/chronos-managed-cron-contract.md``.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger("cron.chronos")
# Endpoint paths under the portal base URL.
_PROVISION_PATH = "/api/agent-cron/provision"
_CANCEL_PATH = "/api/agent-cron/cancel"
_LIST_PATH = "/api/agent-cron/list"
class NasCronClientError(RuntimeError):
"""Raised when a NAS agent-cron call fails (non-2xx or transport error)."""
class NasCronClient:
"""Minimal client for the agent→NAS provision/cancel/list endpoints.
Uses the agent's refresh-aware Nous access token for auth. No scheduler
vendor, no scheduler creds NAS hides all of that behind these three calls.
"""
def __init__(self, portal_url: str, *, timeout_seconds: float = 15.0) -> None:
self.portal_url = portal_url.rstrip("/")
self.timeout_seconds = timeout_seconds
# -- auth -------------------------------------------------------------
def _access_token(self) -> str:
"""The agent's existing Nous Portal access token (refresh-aware)."""
from hermes_cli.auth import resolve_nous_access_token
return resolve_nous_access_token()
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self._access_token()}",
"Content-Type": "application/json",
}
# -- HTTP -------------------------------------------------------------
def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]:
import requests # lazy: agent already depends on requests
url = f"{self.portal_url}{path}"
try:
resp = requests.post(
url, json=body, headers=self._headers(), timeout=self.timeout_seconds
)
except Exception as e:
raise NasCronClientError(f"POST {path} failed: {e}") from e
if resp.status_code // 100 != 2:
raise NasCronClientError(
f"POST {path} returned {resp.status_code}: {resp.text[:200]}"
)
try:
return resp.json() if resp.content else {}
except Exception:
return {}
def _get(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]:
import requests
url = f"{self.portal_url}{path}"
try:
resp = requests.get(
url, params=params, headers=self._headers(), timeout=self.timeout_seconds
)
except Exception as e:
raise NasCronClientError(f"GET {path} failed: {e}") from e
if resp.status_code // 100 != 2:
raise NasCronClientError(
f"GET {path} returned {resp.status_code}: {resp.text[:200]}"
)
try:
return resp.json() if resp.content else {}
except Exception:
return {}
# -- endpoints --------------------------------------------------------
def provision(self, *, job_id: str, fire_at: str, agent_callback_url: str,
dedup_key: str) -> Dict[str, Any]:
"""Ask NAS to arm a one-shot for ``job_id`` at ``fire_at`` (ISO 8601).
``dedup_key`` (``{job_id}:{fire_at}``) makes re-arming the same fire
idempotent NAS-side. Returns the NAS response (e.g. ``{schedule_id}``).
"""
return self._post(_PROVISION_PATH, {
"job_id": job_id,
"fire_at": fire_at,
"agent_callback_url": agent_callback_url,
"dedup_key": dedup_key,
})
def cancel(self, *, job_id: str) -> Dict[str, Any]:
"""Ask NAS to cancel any armed one-shot for ``job_id``."""
return self._post(_CANCEL_PATH, {"job_id": job_id})
def list_armed(self) -> List[Dict[str, Any]]:
"""List the one-shots NAS currently has armed for this agent.
Returns a list of ``{job_id, fire_at, schedule_id}``. Best-effort: used
by reconcile to find orphaned arms on a cold process; on error the
caller falls back to idempotent re-arm of all desired jobs.
"""
data = self._get(_LIST_PATH, {})
items = data.get("armed") if isinstance(data, dict) else None
return items if isinstance(items, list) else []

View file

@ -0,0 +1,9 @@
name: chronos
description: >-
Chronos — NAS-mediated managed cron provider for scale-to-zero hosted agents.
Delegates the "wake me at time T" trigger to Nous infrastructure so an idle
gateway can scale to zero and still fire cron jobs. The agent computes each
job's next-fire time and asks NAS to arm a one-shot; NAS calls the agent back
at fire time over an authenticated webhook. Inert unless cron.provider=chronos.
version: 1.0.0
author: Nous Research

View file

@ -0,0 +1,203 @@
"""Unit tests for the Chronos NAS-mediated cron provider (Phase 4D).
All NAS calls are mocked ZERO live network. These prove:
- is_available is config-only (no network), false without config.
- one-shot arming sends the right provision payload (incl. sub-minute fires
the agent owns the time, so there's no 1-minute floor).
- reconcile arms missing, cancels orphaned, skips paused.
- fire_due re-arms the next one-shot after a successful run, and repeat-N
(job gone) stops re-arming.
"""
import pytest
@pytest.fixture
def temp_home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
yield tmp_path
@pytest.fixture
def chronos(monkeypatch):
"""A ChronosCronScheduler with a fake NAS client capturing calls."""
from plugins.cron.chronos import ChronosCronScheduler
class FakeClient:
def __init__(self):
self.provisions = []
self.cancels = []
self._armed = []
def provision(self, *, job_id, fire_at, agent_callback_url, dedup_key):
self.provisions.append({
"job_id": job_id, "fire_at": fire_at,
"agent_callback_url": agent_callback_url, "dedup_key": dedup_key,
})
return {"schedule_id": f"sched-{job_id}"}
def cancel(self, *, job_id):
self.cancels.append(job_id)
return {}
def list_armed(self):
return list(self._armed)
prov = ChronosCronScheduler()
fake = FakeClient()
prov._client = fake
# callback_url is read via _cfg; patch the module helper to avoid config.
monkeypatch.setattr("plugins.cron.chronos._cfg",
lambda *k, default="": "https://agent.example/" if k[-1] == "callback_url" else "https://portal.test")
return prov, fake
# -- is_available -------------------------------------------------------------
def test_is_available_false_without_config(temp_home, monkeypatch):
from plugins.cron.chronos import ChronosCronScheduler
monkeypatch.setattr("plugins.cron.chronos._cfg", lambda *k, default="": "")
assert ChronosCronScheduler().is_available() is False
def test_is_available_true_with_config_and_token(temp_home, monkeypatch):
import plugins.cron.chronos as mod
from plugins.cron.chronos import ChronosCronScheduler
monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x" )
monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state",
lambda pid: {"access_token": "tok"})
assert ChronosCronScheduler().is_available() is True
def test_is_available_makes_no_network(temp_home, monkeypatch):
"""is_available must not construct the NAS client / hit network."""
import plugins.cron.chronos as mod
from plugins.cron.chronos import ChronosCronScheduler
monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x")
monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state",
lambda pid: {"access_token": "tok"})
p = ChronosCronScheduler()
def explode():
raise AssertionError("is_available must not build the NAS client")
monkeypatch.setattr(p, "_get_client", explode)
assert p.is_available() is True # did not call _get_client
# -- arming -------------------------------------------------------------------
def test_arm_one_shot_sends_provision(chronos):
prov, fake = chronos
prov._arm_one_shot({"id": "j1", "next_run_at": "2026-06-18T12:00:00+00:00"})
assert len(fake.provisions) == 1
p = fake.provisions[0]
assert p["job_id"] == "j1"
assert p["fire_at"] == "2026-06-18T12:00:00+00:00"
assert p["dedup_key"] == "j1:2026-06-18T12:00:00+00:00"
assert p["agent_callback_url"] == "https://agent.example/"
def test_arm_one_shot_preserves_sub_minute_fire(chronos):
"""Sub-minute fire times survive — the agent owns the time, so there's no
1-minute scheduler floor."""
prov, fake = chronos
prov._arm_one_shot({"id": "j2", "next_run_at": "2026-06-18T12:00:30+00:00"})
assert fake.provisions[0]["fire_at"] == "2026-06-18T12:00:30+00:00"
def test_arm_one_shot_noop_without_next_run(chronos):
prov, fake = chronos
prov._arm_one_shot({"id": "j3", "next_run_at": None})
assert fake.provisions == []
# -- reconcile ----------------------------------------------------------------
def test_reconcile_arms_all_enabled(temp_home, chronos, monkeypatch):
prov, fake = chronos
jobs = [
{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"},
{"id": "b", "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00", "state": "scheduled"},
]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: next(j for j in jobs if j["id"] == jid))
prov.reconcile()
assert {p["job_id"] for p in fake.provisions} == {"a", "b"}
assert fake.cancels == []
def test_reconcile_cancels_orphan_arms_desired(temp_home, chronos, monkeypatch):
prov, fake = chronos
# NAS already has a stale arm for deleted job "gone".
prov._armed = {"gone": "2026-06-18T11:00:00+00:00"}
jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None))
prov.reconcile()
assert [p["job_id"] for p in fake.provisions] == ["a"]
assert fake.cancels == ["gone"]
def test_reconcile_skips_paused(temp_home, chronos, monkeypatch):
prov, fake = chronos
jobs = [{"id": "p", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "paused"}]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None))
prov.reconcile()
assert fake.provisions == []
def test_reconcile_skips_already_armed_same_time(temp_home, chronos, monkeypatch):
prov, fake = chronos
prov._armed = {"a": "2026-06-18T12:00:00+00:00"}
jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: jobs[0])
prov.reconcile()
assert fake.provisions == [] # already armed at the same time → no re-arm
# -- fire_due re-arm ----------------------------------------------------------
def test_fire_due_rearms_next_oneshot(chronos, monkeypatch):
prov, fake = chronos
# super().fire_due runs the job; stub the ABC default to "ran".
monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due",
lambda self, jid, **kw: True)
monkeypatch.setattr("cron.jobs.get_job",
lambda jid: {"id": jid, "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00"})
assert prov.fire_due("j1") is True
assert [p["job_id"] for p in fake.provisions] == ["j1"]
assert fake.provisions[0]["fire_at"] == "2026-06-18T12:05:00+00:00"
def test_fire_due_no_rearm_when_job_gone(chronos, monkeypatch):
"""repeat-N exhausted / one-shot completed → mark_job_run deleted the job →
get_job None no re-arm (the schedule stops cleanly)."""
prov, fake = chronos
monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due",
lambda self, jid, **kw: True)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: None)
assert prov.fire_due("j1") is True
assert fake.provisions == []
def test_fire_due_no_rearm_when_claim_lost(chronos, monkeypatch):
"""If the run didn't happen (claim lost), don't re-arm."""
prov, fake = chronos
monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due",
lambda self, jid, **kw: False)
assert prov.fire_due("j1") is False
assert fake.provisions == []