diff --git a/cron/scheduler.py b/cron/scheduler.py index 9bab59456ea..4f7940db0b1 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -2025,6 +2025,24 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) - return False +def _notify_provider_jobs_changed() -> None: + """Best-effort: tell the active scheduler provider the job set changed. + + Called by the consumer surfaces (model tool / CLI / REST) AFTER a + successful store mutation (create/update/remove/pause/resume) so an external + provider (Chronos) can re-provision/cancel the affected one-shot via NAS. + No-op for the built-in (it re-reads jobs.json each tick), so the default + path is unchanged. Lives here (not in cron/jobs.py) to keep the store free + of provider imports — avoids an import cycle and keeps jobs.py low-coupling. + Never raises into the caller. + """ + try: + from cron.scheduler_provider import resolve_cron_scheduler + resolve_cron_scheduler().on_jobs_changed() + except Exception as e: + logger.debug("on_jobs_changed notify failed: %s", e) + + def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int: """ Check and run all due jobs. diff --git a/docs/chronos-managed-cron-contract.md b/docs/chronos-managed-cron-contract.md new file mode 100644 index 00000000000..0848d5eb939 --- /dev/null +++ b/docs/chronos-managed-cron-contract.md @@ -0,0 +1,192 @@ +# Chronos managed-cron — agent ↔ NAS wire contract + +**Status:** authoritative wire spec for the Chronos cron provider. +**Audience:** the NAS-side implementer of the `agent-cron` endpoints +(`nous-account-service`) and anyone debugging the managed-cron path. + +Chronos lets a hosted Hermes gateway **scale to zero** while idle and still +fire cron jobs. Instead of an in-process 60-second ticker, the agent asks NAS +to arm exactly **one external one-shot per job at that job's real next-fire +time**. NAS calls the agent back at fire time over an authenticated webhook; +the agent runs the job and re-arms the next one-shot. Between fires the agent +process can be fully stopped — it wakes only on a genuine fire. + +The external scheduler NAS uses to implement the one-shots is an **internal NAS +implementation detail**. The agent never talks to it, never holds its +credentials, and never names it. The agent only knows the three NAS endpoints +below. + +``` +create/update/pause/resume/remove a cron job (agent side) + │ + ▼ +ChronosCronScheduler.reconcile() ── agent computes next_run_at + │ POST {portal}/api/agent-cron/provision (auth: agent's Nous access token) + ▼ +NAS arms a one-shot for fire_at ── NAS owns the scheduler + its creds + │ + ⏰ at fire_at + ▼ +scheduler → POST {portal}/api/agent-cron/relay (auth: scheduler signature, NAS-verified) + │ + ▼ +NAS mints a short-lived agent-audience JWT (purpose=cron_fire) + │ POST {agent_callback_url}/api/cron/fire (auth: that JWT) + ▼ +agent verifies the NAS JWT → store CAS claim → run_one_job → re-arm next one-shot +``` + +## Trust model (read this first) + +| Hop | Who calls whom | Auth mechanism | Verified by | +|---|---|---|---| +| 1 | agent → NAS (`provision`/`cancel`/`list`) | the agent's existing **Nous Portal access token** (Bearer) | NAS (its normal agent-token path) | +| 2 | scheduler → NAS (`relay`) | the scheduler's request **signature** | NAS (the signature path it already has) | +| 3 | NAS → agent (`/api/cron/fire`) | a **short-lived NAS-minted JWT** (`aud=agent:{instance_id}`, `purpose=cron_fire`) | agent (PyJWT against NAS JWKS) | + +Why NAS-mediated rather than scheduler→agent direct: the scheduler signs with +**NAS's** keys, which the agent does not (and should not) hold. The agent can +only verify a **NAS-minted** token — a trust path it already has. This keeps +all scheduler credentials inside NAS. (Full rationale: the plan's DQ-4.) + +No new secret is introduced on the agent: hop 1 reuses the token the agent +already uses for the portal, and hop 3 reuses the NAS-JWT verification the agent +already performs. + +--- + +## Endpoint 1 — `POST /api/agent-cron/provision` (agent → NAS) + +Arm (or re-arm, idempotently) exactly one one-shot for a job. + +- **Auth:** `Authorization: Bearer `. NAS validates via + its normal agent-token path and scopes the row to the calling agent/org. +- **Request body:** + ```json + { + "job_id": "ab12cd34", + "fire_at": "2026-06-18T12:34:56+00:00", + "agent_callback_url": "https://agent-xyz.fly.dev", + "dedup_key": "ab12cd34:2026-06-18T12:34:56+00:00" + } + ``` + - `fire_at` — ISO 8601, **agent-computed**. May be sub-minute in the future; + NAS must honor second-granularity (the agent owns the time, so there is no + 1-minute scheduler floor). + - `agent_callback_url` — the agent's own publicly-reachable base URL. NAS + POSTs `{agent_callback_url}/api/cron/fire` at fire time. + - `dedup_key` — `"{job_id}:{fire_at}"`. NAS **upserts by `(agent_id, job_id)`** + so re-arming the same fire is idempotent (no duplicate one-shots). A new + `fire_at` for the same `job_id` replaces the prior arm. +- **Action:** arm one one-shot to fire at `fire_at`, destined for the NAS + **relay** route (Endpoint 3) — NOT the agent directly, so NAS stays in the + loop to mint the agent JWT. Persist `(agent_id, job_id, schedule_id, + agent_callback_url)`. +- **Response:** `200 {"schedule_id": ""}`. + +## Endpoint 2 — `POST /api/agent-cron/cancel` (agent → NAS) + +- **Auth:** same as Endpoint 1. +- **Body:** `{"job_id": "ab12cd34"}`. +- **Action:** cancel the armed one-shot for `(agent_id, job_id)` and delete the + row. Idempotent — cancelling an unknown job is a 200 no-op. +- **Response:** `200 {"ok": true}`. + +## Endpoint 3 — `POST /api/agent-cron/relay` (scheduler → NAS, the fire relay) + +- **Auth:** the scheduler's request **signature**, verified by NAS with the + signature path it already has. This is the trust boundary for the fire — a + forged relay call must be rejected here. +- **Action:** + 1. Look up `(agent_id, job_id) → agent_callback_url` from the persisted row. + 2. Mint a **short-lived** JWT: `aud = "agent:{instance_id}"`, + `iss = {portal_url}`, `purpose = "cron_fire"`, small `exp` (≈60–120s), + signed with NAS's normal asymmetric signing key (published via JWKS). + 3. `POST {agent_callback_url}/api/cron/fire` with + `Authorization: Bearer ` and body `{"job_id": "...", "fire_at": "..."}`. + 4. Treat a non-2xx agent response as a **retryable** failure (let the + scheduler retry the relay). The agent's store CAS de-dupes a double fire, + so retries are safe. +- **Response to the scheduler:** 2xx once the agent POST is accepted (202), so + the scheduler does not retry a delivered fire. + +--- + +## Inbound `POST /api/cron/fire` (NAS → agent) — agent side, already implemented + +This is the agent endpoint NAS calls in Endpoint 3 step 3. Implemented on the +`APIServerAdapter` (`gateway/platforms/api_server.py`); the verifier is +`plugins/cron/chronos/verify.py`. + +- **Auth:** `Authorization: Bearer `. The agent verifies: + - signature against the NAS JWKS (`cron.chronos.nas_jwks_url`), + - `aud` == `cron.chronos.expected_audience` (this agent's + `agent:{instance_id}`), + - `iss` == `cron.chronos.portal_url`, + - `exp` / `nbf` (30s leeway), + - `purpose == "cron_fire"` — a general agent JWT (no/other purpose) is + rejected so it can't be replayed against this endpoint. +- **Body:** `{"job_id": "ab12cd34", "fire_at": "..."}` (only `job_id` is used). +- **Behavior:** + - invalid/missing/forged/expired/wrong-aud/wrong-purpose token → **401**, no + execution. + - missing `job_id` → **400**. + - valid → **202 `{"status": "accepted", "job_id": "..."}`** immediately, and + the job runs in the background. 202-before-run means a long agent turn never + trips the relay's HTTP timeout. +- **At-most-once:** the agent claims the job with a store-level compare-and-set + (`claim_job_for_fire`) before running. A relay/scheduler retry that arrives + while the first fire is in flight (or after it completed) loses the claim and + does not double-run. + +--- + +## At-most-once & re-arm semantics + +- **Recurring (cron/interval):** on fire, the agent advances `next_run_at` + (under its store lock) as part of the claim, runs the job, then re-provisions + a one-shot for the new `next_run_at`. A duplicate relay for the old `fire_at` + finds the claim taken / time advanced and is dropped. +- **One-shot (`30m`, `+90s`, etc.):** fires once; `mark_job_run` marks it + completed. No re-arm. +- **`repeat.times = N`:** `mark_job_run` deletes the job at the limit, so + `get_job` returns `None` after the final fire → the agent does **not** re-arm + → the schedule stops cleanly with no orphaned one-shot. +- **Multi-replica agents:** the store CAS makes the fire at-most-once across N + gateway replicas sharing one `HERMES_HOME` — exactly one replica runs each + fire. + +## Reconcile (self-healing) + +The agent reconciles desired (`jobs.json`) vs armed on: +- `start()` (gateway boot / wake), +- every successful job mutation (`on_jobs_changed`), +- piggybacked after each fire (re-arm). + +Reconcile arms missing/changed-time jobs and cancels orphans. A missed +provision (transient NAS error) self-heals on the next reconcile. There is **no +periodic wake** of a sleeping agent — that would negate scale-to-zero. + +## Config (agent side) + +All non-secret (`cron.chronos.*` in `config.yaml`); the agent holds no scheduler +credentials. For hosted agents NAS sets these at provision time: + +| key | meaning | +|---|---| +| `cron.provider` | `"chronos"` to activate (empty = built-in ticker) | +| `cron.chronos.portal_url` | NAS base URL (also the expected JWT `iss`) | +| `cron.chronos.callback_url` | the agent's own public base URL for NAS→agent fires | +| `cron.chronos.expected_audience` | this agent's JWT `aud` (`agent:{instance_id}`) | +| `cron.chronos.nas_jwks_url` | NAS JWKS for verifying the fire JWT | + +If `callback_url` / `portal_url` is blank or the agent has no Nous login, +`is_available()` returns False and the resolver falls back to the built-in +in-process ticker — cron never loses its trigger. + +## Escape hatch (not default) + +The inbound `/api/cron/fire` verifier is pluggable (`get_fire_verifier()`). If +relay volume through NAS ever saturates, a direct scheduler→agent mode with a +per-job NAS-minted cron-key can replace the NAS-JWT verifier with **no change to +the webhook handler**. NAS-mediated (this contract) is the default. diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index c657f4b4c6d..f7e1ba42f85 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -717,6 +717,16 @@ except ImportError: _cron_resume = None _cron_trigger = None + +def _notify_cron_provider_jobs_changed() -> None: + """Tell the active cron scheduler provider the job set changed after a REST + mutation (no-op for the built-in). Best-effort — never breaks the handler.""" + try: + from cron.scheduler import _notify_provider_jobs_changed + _notify_provider_jobs_changed() + except Exception: + pass + # Defense-in-depth: mirror the agent-facing cronjob tool, which scans the # user-supplied prompt for exfiltration/injection payloads at create/update # time (tools/cronjob_tools.py). The REST cron endpoints are authenticated @@ -3206,6 +3216,7 @@ class APIServerAdapter(BasePlatformAdapter): kwargs["repeat"] = repeat job = _cron_create(**kwargs) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3262,6 +3273,7 @@ class APIServerAdapter(BasePlatformAdapter): job = _cron_update(job_id, sanitized) if not job: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3281,6 +3293,7 @@ class APIServerAdapter(BasePlatformAdapter): success = _cron_remove(job_id) if not success: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"ok": True}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3300,6 +3313,7 @@ class APIServerAdapter(BasePlatformAdapter): job = _cron_pause(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3319,6 +3333,7 @@ class APIServerAdapter(BasePlatformAdapter): job = _cron_resume(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index d53393ac432..79f56be5d2e 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2132,6 +2132,25 @@ DEFAULT_CONFIG = { # An unknown or unavailable provider falls back to the built-in, so cron # never loses its trigger. "provider": "", + # Chronos (NAS-mediated managed cron) settings. Only consulted when + # provider == "chronos". All non-secret (URLs + the JWT audience): the + # agent holds NO external-scheduler credentials. For hosted agents, NAS + # sets these at provision time. The outbound provision call reuses the + # agent's existing Nous Portal token — there is no token key here. + "chronos": { + # NAS / portal base URL the agent calls to arm/cancel one-shots + # and that mints the inbound fire JWT (used as the expected issuer). + "portal_url": "https://portal.nousresearch.com", + # The agent's OWN publicly-reachable base URL for NAS→agent fires + # (NAS POSTs {callback_url}/api/cron/fire). Empty → Chronos is + # unavailable and the resolver falls back to the built-in ticker. + "callback_url": "", + # This agent's expected JWT audience (e.g. "agent:{instance_id}"). + "expected_audience": "", + # NAS JWKS URL for verifying the inbound fire JWT's signature. + # Empty → the fire endpoint refuses all tokens (no unsigned decode). + "nas_jwks_url": "", + }, # Wrap delivered cron responses with a header (task name) and footer # ("The agent cannot see this message"). Set to false for clean output. "wrap_response": True, diff --git a/tests/cron/test_jobs_changed_notify.py b/tests/cron/test_jobs_changed_notify.py new file mode 100644 index 00000000000..eed875186b4 --- /dev/null +++ b/tests/cron/test_jobs_changed_notify.py @@ -0,0 +1,101 @@ +"""Tests for on_jobs_changed wiring (Phase 4F.1). + +After a store mutation via the consumer surfaces (model tool / CLI / REST), the +active scheduler provider's on_jobs_changed() must be invoked so an external +provider (Chronos) re-provisions/cancels. The built-in's no-op default means +the default path is unchanged. +""" + +import pytest + + +@pytest.fixture +def temp_home(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + yield tmp_path + + +def test_notify_helper_calls_provider_on_jobs_changed(monkeypatch): + """cron.scheduler._notify_provider_jobs_changed resolves the provider and + calls on_jobs_changed exactly once.""" + import cron.scheduler_provider as sp + import cron.scheduler as sched + + calls = [] + + class Spy(sp.CronScheduler): + @property + def name(self): + return "spy" + + def start(self, stop_event, **kw): + pass + + def on_jobs_changed(self): + calls.append(1) + + monkeypatch.setattr(sp, "resolve_cron_scheduler", lambda: Spy()) + sched._notify_provider_jobs_changed() + assert calls == [1] + + +def test_notify_helper_swallows_provider_errors(monkeypatch): + """A provider that raises in on_jobs_changed must not propagate into the + caller (best-effort notify).""" + import cron.scheduler_provider as sp + import cron.scheduler as sched + + class Boom(sp.CronScheduler): + @property + def name(self): + return "boom" + + def start(self, stop_event, **kw): + pass + + def on_jobs_changed(self): + raise RuntimeError("kaboom") + + monkeypatch.setattr(sp, "resolve_cron_scheduler", lambda: Boom()) + sched._notify_provider_jobs_changed() # must not raise + + +def test_builtin_notify_is_harmless(monkeypatch): + """With the built-in provider (default), notify is a no-op and never + raises.""" + import cron.scheduler as sched + # default resolution → built-in; just assert it doesn't blow up. + sched._notify_provider_jobs_changed() + + +def test_tool_create_notifies_provider(temp_home, monkeypatch): + """Creating a job via the cronjob tool path invokes on_jobs_changed.""" + import cron.scheduler as sched + calls = [] + monkeypatch.setattr(sched, "_notify_provider_jobs_changed", + lambda: calls.append("changed")) + + from tools.cronjob_tools import cronjob + import json + + out = json.loads(cronjob(action="create", prompt="echo hi", schedule="every 5m", name="w")) + assert out["success"] is True + assert calls == ["changed"] + + +def test_tool_remove_notifies_provider(temp_home, monkeypatch): + """Removing a job via the tool path invokes on_jobs_changed.""" + import json + from tools.cronjob_tools import cronjob + + created = json.loads(cronjob(action="create", prompt="x", schedule="every 5m", name="r")) + jid = created["job_id"] + + import cron.scheduler as sched + calls = [] + monkeypatch.setattr(sched, "_notify_provider_jobs_changed", + lambda: calls.append("changed")) + + out = json.loads(cronjob(action="remove", job_id=jid)) + assert out["success"] is True + assert calls == ["changed"] diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 7ec31b806c4..0bd62b2fc37 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -33,6 +33,16 @@ from cron.jobs import ( ) +def _notify_provider_jobs_changed_safe() -> None: + """Tell the active cron scheduler provider the job set changed (no-op for + the built-in). Best-effort — never lets a provider error break the tool.""" + try: + from cron.scheduler import _notify_provider_jobs_changed + _notify_provider_jobs_changed() + except Exception: + pass + + # --------------------------------------------------------------------------- # Cron prompt scanning # --------------------------------------------------------------------------- @@ -549,6 +559,7 @@ def cronjob( workdir=_normalize_optional_job_value(workdir), no_agent=_no_agent, ) + _notify_provider_jobs_changed_safe() return json.dumps( { "success": True, @@ -604,6 +615,7 @@ def cronjob( removed = remove_job(job_id) if not removed: return tool_error(f"Failed to remove job '{job_id}'", success=False) + _notify_provider_jobs_changed_safe() return json.dumps( { "success": True, @@ -619,10 +631,12 @@ def cronjob( if normalized == "pause": updated = pause_job(job_id, reason=reason) + _notify_provider_jobs_changed_safe() return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) if normalized == "resume": updated = resume_job(job_id) + _notify_provider_jobs_changed_safe() return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) if normalized in {"run", "run_now", "trigger"}: @@ -711,6 +725,7 @@ def cronjob( if not updates: return tool_error("No updates provided.", success=False) updated = update_job(job_id, updates) + _notify_provider_jobs_changed_safe() return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) return tool_error(f"Unknown cron action '{action}'", success=False) diff --git a/website/docs/developer-guide/cron-internals.md b/website/docs/developer-guide/cron-internals.md index c895d339b09..386302554d7 100644 --- a/website/docs/developer-guide/cron-internals.md +++ b/website/docs/developer-guide/cron-internals.md @@ -129,6 +129,48 @@ A provider only controls the trigger, never execution. In CLI mode, cron jobs only fire when `hermes cron` commands are run or during active CLI sessions. +### Managed cron (Chronos) for scale-to-zero + +Hosted gateways can run the **Chronos** provider (`cron.provider: chronos`) +instead of the built-in ticker. Chronos lets an idle gateway **scale to zero** +and still fire cron jobs: rather than a 60-second in-process loop (which would +keep the process awake), it asks Nous infrastructure to arm exactly **one +managed one-shot per job at that job's real next-fire time**. At fire time Nous +calls the gateway back over an authenticated webhook (`POST /api/cron/fire`); +the gateway runs the job through the same `run_one_job` path as the built-in, +then re-arms the next one-shot. Between fires the process can be fully stopped — +it wakes only on a genuine fire, never on a periodic timer. + +The flow (the managed scheduler is provided by Nous; the agent holds no +scheduler credentials): + +``` +create/update a cron job + → Chronos asks Nous to arm a one-shot at the job's next_run_at + (authenticated with the agent's existing Nous token) + → at fire time Nous calls the gateway: POST {callback_url}/api/cron/fire + (authenticated with a short-lived, purpose-scoped Nous-minted JWT) + → the gateway verifies the token, claims the job (store compare-and-set so + multi-replica deployments fire at-most-once), runs it, and re-arms the next + one-shot +``` + +Config (all non-secret; on hosted agents Nous sets these at provision time): + +| key | meaning | +|---|---| +| `cron.provider` | `chronos` to activate (empty = built-in ticker) | +| `cron.chronos.portal_url` | Nous base URL (arming + the fire-token issuer) | +| `cron.chronos.callback_url` | the gateway's own public base URL for inbound fires | +| `cron.chronos.expected_audience` | this agent's fire-token audience | +| `cron.chronos.nas_jwks_url` | key set for verifying the inbound fire token | + +If Chronos is misconfigured or the agent isn't logged into Nous, +`resolve_cron_scheduler()` falls back to the built-in ticker (logged warning) — +cron never loses its trigger. Recurring jobs re-arm after each fire; `repeat`-N +jobs stop cleanly when the count is exhausted (no orphaned one-shot). The full +agent↔Nous wire contract lives in `docs/chronos-managed-cron-contract.md`. + ### Fresh Session Isolation Each cron job runs in a completely fresh agent session: diff --git a/website/docs/reference/cli-commands.md b/website/docs/reference/cli-commands.md index f0fe67d4349..0cf004f1a0c 100644 --- a/website/docs/reference/cli-commands.md +++ b/website/docs/reference/cli-commands.md @@ -534,11 +534,13 @@ hermes cron | `tick` | Run due jobs once and exit. | The cron **trigger** is pluggable via the `cron.provider` config key. Empty -(the default) uses the built-in in-process ticker. A named provider (e.g. -`chronos`, a managed-cron provider for scale-to-zero deployments) is discovered -from `plugins/cron//` or `$HERMES_HOME/plugins//`; an unknown or -unavailable provider falls back to the built-in, so cron is never left without -a trigger. See the [cron internals](../developer-guide/cron-internals.md#gateway-integration) doc. +(the default) uses the built-in in-process ticker. Set it to `chronos` (the +NAS-managed provider for scale-to-zero hosted gateways) — configured via the +`cron.chronos.*` keys (`portal_url`, `callback_url`, `expected_audience`, +`nas_jwks_url`) — or name a custom provider under `plugins/cron//` or +`$HERMES_HOME/plugins//`. An unknown or unavailable provider falls back to +the built-in, so cron is never left without a trigger. See the +[cron internals](../developer-guide/cron-internals.md#gateway-integration) doc. ## `hermes kanban`