diff --git a/plugins/cron/chronos/__init__.py b/plugins/cron/chronos/__init__.py new file mode 100644 index 00000000000..1ec5a457763 --- /dev/null +++ b/plugins/cron/chronos/__init__.py @@ -0,0 +1,241 @@ +"""Chronos — NAS-mediated managed cron provider (scale-to-zero). + +Chronos (the Greek god of time, alongside Hermes) is the first non-default +``CronScheduler``. It lets a hosted gateway scale to zero while idle and still +fire cron jobs: instead of a 60s in-process ticker, it asks NAS to arm exactly +one external one-shot per job at that job's real next-fire time. NAS calls the +agent back at fire time over an authenticated webhook (``/api/cron/fire``); the +agent runs the job via the shared ``run_one_job`` body and re-arms the next +one-shot. + +The external scheduler NAS uses is an internal NAS implementation detail — +Chronos names no vendor, holds no scheduler credentials, and speaks only to +NAS's ``agent-cron`` endpoints with the agent's existing Nous token. + +Design constraints (see the plan's DQ-1): + - start() arms all enabled jobs and RETURNS; it never blocks and never spawns + a periodic wake. Between fires the machine is truly at zero. + - reconcile runs only on a warm process (start / on_jobs_changed / piggybacked + on a fire), never as a periodic wake of a sleeping machine. + +Inert unless ``cron.provider: chronos``. ``resolve_cron_scheduler`` falls back +to the built-in if Chronos is unavailable, so cron never loses its trigger. + +Wire contract: ``docs/chronos-managed-cron-contract.md``. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any, Dict, Optional + +from cron.scheduler_provider import CronScheduler + +logger = logging.getLogger("cron.chronos") + + +def _cfg(*keys: str, default: Any = "") -> Any: + """Read a cron.chronos.* config value (no network).""" + try: + from hermes_cli.config import cfg_get, load_config + return cfg_get(load_config(), *keys, default=default) + except Exception: + return default + + +class ChronosCronScheduler(CronScheduler): + """NAS-mediated external cron provider.""" + + def __init__(self) -> None: + # In-memory map of job_id → fire_at we've asked NAS to arm. Best-effort + # cache; reconcile rebuilds desired state from jobs.json, so a cold + # process simply re-arms (idempotent via dedup_key). + self._armed: Dict[str, str] = {} + self._lock = threading.Lock() + self._client = None # lazily constructed (no network in is_available) + + # -- identity / availability ----------------------------------------- + + @property + def name(self) -> str: + return "chronos" + + def is_available(self) -> bool: + """Config presence only — NO network. + + Chronos needs a portal base URL, the agent's own publicly-reachable + callback URL (for NAS→agent fires), and a usable Nous token (the agent + is logged into the portal). If any is missing, resolve_cron_scheduler + falls back to the built-in ticker. + """ + if not (_cfg("cron", "chronos", "portal_url") and _cfg("cron", "chronos", "callback_url")): + return False + return self._have_nous_token() + + def _have_nous_token(self) -> bool: + """True if the agent has a Nous Portal login (no network call). + + Checks the stored auth state for a Nous access token — does NOT refresh + or hit the network (is_available must stay offline). The actual + refresh-aware token is resolved lazily at provision time. + """ + try: + from hermes_cli.auth import get_provider_auth_state + state = get_provider_auth_state("nous") or {} + return bool(state.get("access_token")) + except Exception: + return False + + # -- client ----------------------------------------------------------- + + def _get_client(self): + if self._client is None: + from ._nas_client import NasCronClient + self._client = NasCronClient(_cfg("cron", "chronos", "portal_url")) + return self._client + + def _callback_url(self) -> str: + return str(_cfg("cron", "chronos", "callback_url") or "") + + # -- lifecycle -------------------------------------------------------- + + def start(self, stop_event, *, adapters=None, loop=None, interval=60): + """Arm all enabled jobs via NAS, then RETURN immediately. + + Does NOT block and does NOT spawn a 60s wake (DQ-1) — that is the whole + point of scale-to-zero. The machine wakes only on a NAS→agent fire. + """ + try: + self.reconcile() + except Exception as e: + logger.warning("Chronos start() reconcile failed: %s", e) + # Intentionally return — no loop, no periodic wake. + + def stop(self) -> None: + return None + + def on_jobs_changed(self) -> None: + """A job was created/updated/removed/paused/resumed — reconcile the NAS + registry so the affected one-shot is (re-)armed or cancelled.""" + try: + self.reconcile() + except Exception as e: + logger.debug("Chronos on_jobs_changed reconcile failed: %s", e) + + # -- arming ----------------------------------------------------------- + + def _arm_one_shot(self, job: Dict[str, Any]) -> None: + """Ask NAS to arm exactly one one-shot at the job's next_run_at. + + The agent computes the time; NAS+its scheduler are the dumb executor. + Idempotent per (job_id, fire_at) via dedup_key, so re-arming the same + fire is a no-op NAS-side. + """ + job_id = job["id"] + fire_at = job.get("next_run_at") + if not fire_at: + return + dedup_key = f"{job_id}:{fire_at}" + self._get_client().provision( + job_id=job_id, + fire_at=fire_at, + agent_callback_url=self._callback_url(), + dedup_key=dedup_key, + ) + with self._lock: + self._armed[job_id] = fire_at + + def _cancel(self, job_id: str) -> None: + try: + self._get_client().cancel(job_id=job_id) + finally: + with self._lock: + self._armed.pop(job_id, None) + + def _list_armed(self) -> Dict[str, str]: + """Observed armed one-shots: job_id → fire_at. + + Prefer the in-memory map (warm process); on a cold/empty map, ask NAS + (best-effort). If NAS list fails, return what we have — reconcile then + re-arms desired jobs idempotently. + """ + with self._lock: + if self._armed: + return dict(self._armed) + try: + observed = { + item["job_id"]: item.get("fire_at", "") + for item in self._get_client().list_armed() + if item.get("job_id") + } + with self._lock: + self._armed.update(observed) + return observed + except Exception as e: + logger.debug("Chronos _list_armed failed (will re-arm idempotently): %s", e) + return {} + + # -- reconcile -------------------------------------------------------- + + def reconcile(self) -> None: + """Converge the NAS-armed one-shots toward jobs.json (desired state): + arm missing / re-arm changed-time, cancel orphaned.""" + from cron.jobs import load_jobs + + desired: Dict[str, str] = { + j["id"]: j["next_run_at"] + for j in load_jobs() + if j.get("enabled") and j.get("next_run_at") and j.get("state") != "paused" + } + observed = self._list_armed() + + # Arm missing or changed-time. + for job_id, fire_at in desired.items(): + if observed.get(job_id) != fire_at: + # Re-fetch the full job dict to arm (need the whole record). + from cron.jobs import get_job + job = get_job(job_id) + if job: + try: + self._arm_one_shot(job) + except Exception as e: + logger.warning("Chronos failed to arm job %s: %s", job_id, e) + + # Cancel orphans (armed but no longer desired). + for job_id in list(observed.keys()): + if job_id not in desired: + try: + self._cancel(job_id) + except Exception as e: + logger.warning("Chronos failed to cancel orphan %s: %s", job_id, e) + + # -- fire ------------------------------------------------------------- + + def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool: + """Run the due job (claim + run_one_job via the ABC default), then + re-arm the NEXT one-shot through NAS. + + Re-arm happens AFTER the run so next_run_at reflects the completed fire. + If the job is gone (one-shot completed / repeat-N exhausted), get_job + returns None → nothing to re-arm (the schedule naturally stops). + """ + ran = super().fire_due(job_id, adapters=adapters, loop=loop) + if ran: + from cron.jobs import get_job + job = get_job(job_id) + if job and job.get("enabled") and job.get("next_run_at"): + try: + self._arm_one_shot(job) + except Exception as e: + logger.warning("Chronos failed to re-arm job %s after fire: %s", job_id, e) + return ran + + +def register(ctx) -> None: + """Plugin entrypoint — register the Chronos provider with the loader. + + Mirrors the memory-plugin shape; plugins/cron discovery calls this and + collects the provider via register_cron_scheduler. + """ + ctx.register_cron_scheduler(ChronosCronScheduler()) diff --git a/plugins/cron/chronos/_nas_client.py b/plugins/cron/chronos/_nas_client.py new file mode 100644 index 00000000000..04382adc8ea --- /dev/null +++ b/plugins/cron/chronos/_nas_client.py @@ -0,0 +1,123 @@ +"""Thin HTTP client for the agent → NAS ``agent-cron`` endpoints (Chronos). + +The Chronos provider speaks ONLY to NAS — it names no scheduler vendor and +holds no scheduler credentials. NAS owns the external scheduler (an internal +implementation detail) and that scheduler's account; the agent just asks NAS to +"arm a one-shot at time T" / "cancel" / "list", authenticated with the agent's +existing Nous Portal access token (the same token it already uses to call the +portal — no new secret). + +Wire contract: ``docs/chronos-managed-cron-contract.md``. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("cron.chronos") + +# Endpoint paths under the portal base URL. +_PROVISION_PATH = "/api/agent-cron/provision" +_CANCEL_PATH = "/api/agent-cron/cancel" +_LIST_PATH = "/api/agent-cron/list" + + +class NasCronClientError(RuntimeError): + """Raised when a NAS agent-cron call fails (non-2xx or transport error).""" + + +class NasCronClient: + """Minimal client for the agent→NAS provision/cancel/list endpoints. + + Uses the agent's refresh-aware Nous access token for auth. No scheduler + vendor, no scheduler creds — NAS hides all of that behind these three calls. + """ + + def __init__(self, portal_url: str, *, timeout_seconds: float = 15.0) -> None: + self.portal_url = portal_url.rstrip("/") + self.timeout_seconds = timeout_seconds + + # -- auth ------------------------------------------------------------- + + def _access_token(self) -> str: + """The agent's existing Nous Portal access token (refresh-aware).""" + from hermes_cli.auth import resolve_nous_access_token + return resolve_nous_access_token() + + def _headers(self) -> Dict[str, str]: + return { + "Authorization": f"Bearer {self._access_token()}", + "Content-Type": "application/json", + } + + # -- HTTP ------------------------------------------------------------- + + def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]: + import requests # lazy: agent already depends on requests + + url = f"{self.portal_url}{path}" + try: + resp = requests.post( + url, json=body, headers=self._headers(), timeout=self.timeout_seconds + ) + except Exception as e: + raise NasCronClientError(f"POST {path} failed: {e}") from e + if resp.status_code // 100 != 2: + raise NasCronClientError( + f"POST {path} returned {resp.status_code}: {resp.text[:200]}" + ) + try: + return resp.json() if resp.content else {} + except Exception: + return {} + + def _get(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]: + import requests + + url = f"{self.portal_url}{path}" + try: + resp = requests.get( + url, params=params, headers=self._headers(), timeout=self.timeout_seconds + ) + except Exception as e: + raise NasCronClientError(f"GET {path} failed: {e}") from e + if resp.status_code // 100 != 2: + raise NasCronClientError( + f"GET {path} returned {resp.status_code}: {resp.text[:200]}" + ) + try: + return resp.json() if resp.content else {} + except Exception: + return {} + + # -- endpoints -------------------------------------------------------- + + def provision(self, *, job_id: str, fire_at: str, agent_callback_url: str, + dedup_key: str) -> Dict[str, Any]: + """Ask NAS to arm a one-shot for ``job_id`` at ``fire_at`` (ISO 8601). + + ``dedup_key`` (``{job_id}:{fire_at}``) makes re-arming the same fire + idempotent NAS-side. Returns the NAS response (e.g. ``{schedule_id}``). + """ + return self._post(_PROVISION_PATH, { + "job_id": job_id, + "fire_at": fire_at, + "agent_callback_url": agent_callback_url, + "dedup_key": dedup_key, + }) + + def cancel(self, *, job_id: str) -> Dict[str, Any]: + """Ask NAS to cancel any armed one-shot for ``job_id``.""" + return self._post(_CANCEL_PATH, {"job_id": job_id}) + + def list_armed(self) -> List[Dict[str, Any]]: + """List the one-shots NAS currently has armed for this agent. + + Returns a list of ``{job_id, fire_at, schedule_id}``. Best-effort: used + by reconcile to find orphaned arms on a cold process; on error the + caller falls back to idempotent re-arm of all desired jobs. + """ + data = self._get(_LIST_PATH, {}) + items = data.get("armed") if isinstance(data, dict) else None + return items if isinstance(items, list) else [] diff --git a/plugins/cron/chronos/plugin.yaml b/plugins/cron/chronos/plugin.yaml new file mode 100644 index 00000000000..aad48b35655 --- /dev/null +++ b/plugins/cron/chronos/plugin.yaml @@ -0,0 +1,9 @@ +name: chronos +description: >- + Chronos — NAS-mediated managed cron provider for scale-to-zero hosted agents. + Delegates the "wake me at time T" trigger to Nous infrastructure so an idle + gateway can scale to zero and still fire cron jobs. The agent computes each + job's next-fire time and asks NAS to arm a one-shot; NAS calls the agent back + at fire time over an authenticated webhook. Inert unless cron.provider=chronos. +version: 1.0.0 +author: Nous Research diff --git a/tests/plugins/test_chronos_cron.py b/tests/plugins/test_chronos_cron.py new file mode 100644 index 00000000000..36b32f7a501 --- /dev/null +++ b/tests/plugins/test_chronos_cron.py @@ -0,0 +1,203 @@ +"""Unit tests for the Chronos NAS-mediated cron provider (Phase 4D). + +All NAS calls are mocked — ZERO live network. These prove: + - is_available is config-only (no network), false without config. + - one-shot arming sends the right provision payload (incl. sub-minute fires — + the agent owns the time, so there's no 1-minute floor). + - reconcile arms missing, cancels orphaned, skips paused. + - fire_due re-arms the next one-shot after a successful run, and repeat-N + (job gone) stops re-arming. +""" + +import pytest + + +@pytest.fixture +def temp_home(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + yield tmp_path + + +@pytest.fixture +def chronos(monkeypatch): + """A ChronosCronScheduler with a fake NAS client capturing calls.""" + from plugins.cron.chronos import ChronosCronScheduler + + class FakeClient: + def __init__(self): + self.provisions = [] + self.cancels = [] + self._armed = [] + + def provision(self, *, job_id, fire_at, agent_callback_url, dedup_key): + self.provisions.append({ + "job_id": job_id, "fire_at": fire_at, + "agent_callback_url": agent_callback_url, "dedup_key": dedup_key, + }) + return {"schedule_id": f"sched-{job_id}"} + + def cancel(self, *, job_id): + self.cancels.append(job_id) + return {} + + def list_armed(self): + return list(self._armed) + + prov = ChronosCronScheduler() + fake = FakeClient() + prov._client = fake + # callback_url is read via _cfg; patch the module helper to avoid config. + monkeypatch.setattr("plugins.cron.chronos._cfg", + lambda *k, default="": "https://agent.example/" if k[-1] == "callback_url" else "https://portal.test") + return prov, fake + + +# -- is_available ------------------------------------------------------------- + +def test_is_available_false_without_config(temp_home, monkeypatch): + from plugins.cron.chronos import ChronosCronScheduler + + monkeypatch.setattr("plugins.cron.chronos._cfg", lambda *k, default="": "") + assert ChronosCronScheduler().is_available() is False + + +def test_is_available_true_with_config_and_token(temp_home, monkeypatch): + import plugins.cron.chronos as mod + from plugins.cron.chronos import ChronosCronScheduler + + monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x" ) + monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state", + lambda pid: {"access_token": "tok"}) + assert ChronosCronScheduler().is_available() is True + + +def test_is_available_makes_no_network(temp_home, monkeypatch): + """is_available must not construct the NAS client / hit network.""" + import plugins.cron.chronos as mod + from plugins.cron.chronos import ChronosCronScheduler + + monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x") + monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state", + lambda pid: {"access_token": "tok"}) + p = ChronosCronScheduler() + + def explode(): + raise AssertionError("is_available must not build the NAS client") + + monkeypatch.setattr(p, "_get_client", explode) + assert p.is_available() is True # did not call _get_client + + +# -- arming ------------------------------------------------------------------- + +def test_arm_one_shot_sends_provision(chronos): + prov, fake = chronos + prov._arm_one_shot({"id": "j1", "next_run_at": "2026-06-18T12:00:00+00:00"}) + + assert len(fake.provisions) == 1 + p = fake.provisions[0] + assert p["job_id"] == "j1" + assert p["fire_at"] == "2026-06-18T12:00:00+00:00" + assert p["dedup_key"] == "j1:2026-06-18T12:00:00+00:00" + assert p["agent_callback_url"] == "https://agent.example/" + + +def test_arm_one_shot_preserves_sub_minute_fire(chronos): + """Sub-minute fire times survive — the agent owns the time, so there's no + 1-minute scheduler floor.""" + prov, fake = chronos + prov._arm_one_shot({"id": "j2", "next_run_at": "2026-06-18T12:00:30+00:00"}) + assert fake.provisions[0]["fire_at"] == "2026-06-18T12:00:30+00:00" + + +def test_arm_one_shot_noop_without_next_run(chronos): + prov, fake = chronos + prov._arm_one_shot({"id": "j3", "next_run_at": None}) + assert fake.provisions == [] + + +# -- reconcile ---------------------------------------------------------------- + +def test_reconcile_arms_all_enabled(temp_home, chronos, monkeypatch): + prov, fake = chronos + jobs = [ + {"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}, + {"id": "b", "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00", "state": "scheduled"}, + ] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: next(j for j in jobs if j["id"] == jid)) + + prov.reconcile() + assert {p["job_id"] for p in fake.provisions} == {"a", "b"} + assert fake.cancels == [] + + +def test_reconcile_cancels_orphan_arms_desired(temp_home, chronos, monkeypatch): + prov, fake = chronos + # NAS already has a stale arm for deleted job "gone". + prov._armed = {"gone": "2026-06-18T11:00:00+00:00"} + jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None)) + + prov.reconcile() + assert [p["job_id"] for p in fake.provisions] == ["a"] + assert fake.cancels == ["gone"] + + +def test_reconcile_skips_paused(temp_home, chronos, monkeypatch): + prov, fake = chronos + jobs = [{"id": "p", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "paused"}] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None)) + + prov.reconcile() + assert fake.provisions == [] + + +def test_reconcile_skips_already_armed_same_time(temp_home, chronos, monkeypatch): + prov, fake = chronos + prov._armed = {"a": "2026-06-18T12:00:00+00:00"} + jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: jobs[0]) + + prov.reconcile() + assert fake.provisions == [] # already armed at the same time → no re-arm + + +# -- fire_due re-arm ---------------------------------------------------------- + +def test_fire_due_rearms_next_oneshot(chronos, monkeypatch): + prov, fake = chronos + # super().fire_due runs the job; stub the ABC default to "ran". + monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due", + lambda self, jid, **kw: True) + monkeypatch.setattr("cron.jobs.get_job", + lambda jid: {"id": jid, "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00"}) + + assert prov.fire_due("j1") is True + assert [p["job_id"] for p in fake.provisions] == ["j1"] + assert fake.provisions[0]["fire_at"] == "2026-06-18T12:05:00+00:00" + + +def test_fire_due_no_rearm_when_job_gone(chronos, monkeypatch): + """repeat-N exhausted / one-shot completed → mark_job_run deleted the job → + get_job None → no re-arm (the schedule stops cleanly).""" + prov, fake = chronos + monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due", + lambda self, jid, **kw: True) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: None) + + assert prov.fire_due("j1") is True + assert fake.provisions == [] + + +def test_fire_due_no_rearm_when_claim_lost(chronos, monkeypatch): + """If the run didn't happen (claim lost), don't re-arm.""" + prov, fake = chronos + monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due", + lambda self, jid, **kw: False) + + assert prov.fire_due("j1") is False + assert fake.provisions == []