Merge pull request #48275 from NousResearch/feat/cron-scheduler-provider-chronos

feat(cron): pluggable CronScheduler interface + Chronos managed-cron provider (scale-to-zero)
This commit is contained in:
Teknium 2026-06-19 07:51:59 -07:00 committed by GitHub
commit 2a5e9d994a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 3017 additions and 85 deletions

View file

@ -976,6 +976,9 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None,
job["last_error"] = error if not success else None
# Track delivery failures separately — cleared on successful delivery
job["last_delivery_error"] = delivery_error
# Clear any external-fire claim so a re-armed recurring job can
# be claimed again on its next fire (Phase 4C CAS).
job["fire_claim"] = None
# Increment completed count
if job.get("repeat"):
@ -1057,6 +1060,71 @@ def advance_next_run(job_id: str) -> bool:
return False
def _machine_id() -> str:
"""Stable-ish identifier for claim attribution/debugging (NOT correctness).
Uses ``HERMES_MACHINE_ID`` if set, else hostname + pid. The CAS correctness
comes from the file lock + the fresh-claim check, not from this value.
"""
explicit = os.getenv("HERMES_MACHINE_ID", "").strip()
if explicit:
return explicit
try:
import socket
host = socket.gethostname()
except Exception:
host = "unknown"
return f"{host}:{os.getpid()}"
def claim_job_for_fire(job_id: str, *, claim_ttl_seconds: int = 300) -> bool:
"""Atomically claim a job for a single external 'fire' (multi-machine
at-most-once). Returns True iff THIS caller won the claim.
Used by the external-provider fire path (``CronScheduler.fire_due``) when an
external scheduler (Chronos) signals a job is due across N gateway replicas:
exactly one wins. Single-machine deployments always win.
Under the file lock: reject if the job is missing/disabled/paused. If a
fresh claim (younger than ``claim_ttl_seconds``) already exists, lose.
Otherwise stamp a ``fire_claim`` and, for recurring jobs, advance
``next_run_at`` (mirrors ``advance_next_run``'s at-most-once bump so a stale
re-delivery for the old time can't re-fire). One-shots keep ``next_run_at``
but the fresh ``fire_claim`` blocks a duplicate retry for the same fire.
``mark_job_run`` clears the claim on completion so a re-armed recurring job
is claimable again next fire.
The stale-claim TTL means a machine that crashed after claiming but before
completing doesn't wedge the job forever — after the TTL another fire can
reclaim it.
"""
with _jobs_lock():
jobs = load_jobs()
for job in jobs:
if job["id"] != job_id:
continue
if not job.get("enabled", True) or job.get("state") == "paused":
return False
now = _hermes_now()
existing = job.get("fire_claim")
if existing:
try:
claimed_at = _ensure_aware(datetime.fromisoformat(existing["at"]))
if (now - claimed_at).total_seconds() < claim_ttl_seconds:
return False # someone holds a fresh claim
except Exception:
pass # malformed claim → overwrite
job["fire_claim"] = {"at": now.isoformat(), "by": _machine_id()}
kind = job.get("schedule", {}).get("kind")
if kind in {"cron", "interval"}:
nxt = compute_next_run(job["schedule"], now.isoformat())
if nxt:
job["next_run_at"] = nxt
save_jobs(jobs)
return True
return False
def get_due_jobs() -> List[Dict[str, Any]]:
"""Get all jobs that are due to run now.

View file

@ -2030,6 +2030,82 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e)
def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) -> bool:
"""Run ONE due job end-to-end: execute → save output → deliver → mark.
This is the shared firing body extracted from ``tick``'s per-job closure so
that BOTH the built-in ticker and an external provider's ``fire_due`` (e.g.
Chronos) run the identical sequence no duplicated correctness.
It does NOT decide whether the job is due, claim it, or compute the next
run those are the caller's concern (``tick`` advances ``next_run_at``
under the file lock before dispatch; an external provider claims via the
store CAS). This function only fires the given job once.
Returns True if the job was processed (even if the job itself failed
failure is recorded via ``mark_job_run``), False only if processing raised.
"""
try:
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else _summarize_cron_failure_for_delivery(job, error)
# Treat whitespace-only final responses the same as empty
# responses: do not deliver a blank message, and let the
# empty-response guard below mark the run as a soft failure.
should_deliver = bool(deliver_content.strip())
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
# Treat empty final_response as a soft failure so last_status
# is not "ok" — the agent ran but produced nothing useful.
# (issue #8585)
if success and not final_response.strip():
success = False
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
return True
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return False
def _notify_provider_jobs_changed() -> None:
"""Best-effort: tell the active scheduler provider the job set changed.
Called by the consumer surfaces (model tool / CLI / REST) AFTER a
successful store mutation (create/update/remove/pause/resume) so an external
provider (Chronos) can re-provision/cancel the affected one-shot via NAS.
No-op for the built-in (it re-reads jobs.json each tick), so the default
path is unchanged. Lives here (not in cron/jobs.py) to keep the store free
of provider imports avoids an import cycle and keeps jobs.py low-coupling.
Never raises into the caller.
"""
try:
from cron.scheduler_provider import resolve_cron_scheduler
resolve_cron_scheduler().on_jobs_changed()
except Exception as e:
logger.debug("on_jobs_changed notify failed: %s", e)
def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int:
"""
Check and run all due jobs.
@ -2108,48 +2184,11 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i
)
def _process_job(job: dict) -> bool:
"""Run one due job end-to-end: execute, save, deliver, mark."""
try:
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else _summarize_cron_failure_for_delivery(job, error)
# Treat whitespace-only final responses the same as empty
# responses: do not deliver a blank message, and let the
# empty-response guard below mark the run as a soft failure.
should_deliver = bool(deliver_content.strip())
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
# Treat empty final_response as a soft failure so last_status
# is not "ok" — the agent ran but produced nothing useful.
# (issue #8585)
if success and not final_response.strip():
success = False
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
return True
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return False
"""Run one due job end-to-end. Thin wrapper around the shared
module-level ``run_one_job`` so ``tick`` and external providers
(Chronos ``fire_due``) use the identical executesavedelivermark
body."""
return run_one_job(job, adapters=adapters, loop=loop, verbose=verbose)
# Partition due jobs: those with a per-job workdir mutate
# os.environ["TERMINAL_CWD"] inside run_job, which is process-global —

177
cron/scheduler_provider.py Normal file
View file

@ -0,0 +1,177 @@
"""CronScheduler provider interface (Axis B — the trigger).
EXPERIMENTAL this interface is validated by exactly ONE consumer (the
built-in) until an external provider (Chronos, Phase 4) shakes it out. Until
then the module path, method signatures, and start() kwargs MAY change without
a deprecation cycle. Once a second provider validates the shape it becomes
stable. Any growth MUST be additive (new optional method with a default), never
a changed signature on start() or a new abstractmethod.
A CronScheduler decides *when* a due job fires. It does NOT decide what firing
means: execution + delivery stay in cron.scheduler.run_job / _deliver_result,
shared by all providers. Providers must never reimplement agent construction or
delivery.
The built-in InProcessCronScheduler runs the historical 60s daemon-thread
ticker. Alternative providers (e.g. Chronos, a NAS-mediated managed-cron
provider for scale-to-zero deployments) live under plugins/cron/<name>/ and are
selected via the `cron.provider` config key (empty = built-in).
"""
from __future__ import annotations
import threading
from abc import ABC, abstractmethod
from typing import Any
class CronScheduler(ABC):
"""Axis-B trigger provider. Decides WHEN a due cron job fires.
Required surface is intentionally minimal: ``name`` + ``start``. ``stop``
and ``is_available`` carry safe defaults. The three Phase-4 hooks
(``on_jobs_changed`` / ``fire_due`` / ``reconcile``) are added later as
NON-abstract methods so the built-in keeps satisfying the ABC without
overriding them see ``test_abc_growth_stays_additive``.
"""
@property
@abstractmethod
def name(self) -> str:
"""Short identifier, e.g. 'builtin', 'chronos'."""
def is_available(self) -> bool:
"""Whether this provider can run in the current environment.
MUST NOT make network calls. The built-in is always available; an
external provider checks for configured endpoint/credentials. When a
named provider returns False, the resolver falls back to the built-in.
"""
return True
@abstractmethod
def start(
self,
stop_event: threading.Event,
*,
adapters: Any = None,
loop: Any = None,
interval: int = 60,
) -> None:
"""Begin firing due jobs.
For the built-in this BLOCKS in the 60s loop until stop_event is set
(it is run inside a daemon thread by the caller, exactly as today).
An external provider may register a schedule/webhook and return
immediately; in that case it must still honor stop_event for teardown.
"""
def stop(self) -> None:
"""Optional eager teardown hook. Default no-op; setting the stop_event
is the primary stop signal. Override for providers holding external
resources (queue consumers, HTTP servers)."""
return None
# --- Optional hooks for external providers (added Phase 4). --------------
# All default-safe so the built-in inherits working behavior without
# overriding. Keep these NON-abstract — see test_abc_growth_stays_additive.
def on_jobs_changed(self) -> None:
"""Called after a successful store mutation (create/update/remove/
pause/resume). External providers reconcile their registry here (e.g.
Chronos re-provisions/cancels the affected one-shot via NAS).
Built-in: no-op (it re-reads jobs.json on every tick)."""
return None
def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool:
"""Run a single job NOW via the shared orchestrator. Called by the
inbound fire webhook when an external scheduler signals a job is due.
The default claims the job with a store-level compare-and-set
(multi-machine at-most-once), then runs it via the shared
``run_one_job`` body. Built-in never calls this (it has its own tick
loop); an external provider routes its inbound fire here.
Returns True if THIS caller claimed and ran the job, False if the claim
was lost (another machine/retry won it) or the job no longer exists.
"""
from cron.jobs import claim_job_for_fire, get_job
from cron.scheduler import run_one_job
if not claim_job_for_fire(job_id):
return False # another machine already claimed this fire
job = get_job(job_id)
if job is None:
return False # job removed (e.g. repeat-N exhausted) between arm and fire
return run_one_job(job, adapters=adapters, loop=loop)
def reconcile(self) -> None:
"""Converge the external registry toward jobs.json (the desired state):
arm missing one-shots, cancel orphaned ones, re-arm changed times.
Built-in: no-op."""
return None
def resolve_cron_scheduler() -> "CronScheduler":
"""Return the active cron scheduler provider.
Reads ``cron.provider`` from config. Empty/absent built-in. A named
provider that is missing, fails to load, or reports ``is_available() ==
False`` falls back to the built-in with a warning cron must never be left
without a trigger.
"""
import logging
logger = logging.getLogger("cron.scheduler_provider")
name = ""
try:
from hermes_cli.config import cfg_get, load_config
name = (cfg_get(load_config(), "cron", "provider", default="") or "").strip()
except Exception:
pass
if not name or name in ("builtin", "in-process", "inprocess"):
return InProcessCronScheduler()
try:
from plugins.cron import load_cron_scheduler
provider = load_cron_scheduler(name)
if provider is None:
logger.warning("cron.provider '%s' not found; using built-in ticker", name)
return InProcessCronScheduler()
if not provider.is_available():
logger.warning("cron.provider '%s' not available; using built-in ticker", name)
return InProcessCronScheduler()
logger.info("Using cron scheduler provider: %s", provider.name)
return provider
except Exception as e:
logger.warning(
"Failed to load cron.provider '%s' (%s); using built-in ticker", name, e
)
return InProcessCronScheduler()
class InProcessCronScheduler(CronScheduler):
"""Default provider: the historical in-process 60s ticker.
``start()`` blocks in the tick loop until ``stop_event`` is set, identical
to the pre-refactor ``_start_cron_ticker`` core loop. The caller runs it in
a daemon thread.
"""
@property
def name(self) -> str:
return "builtin"
def start(self, stop_event, *, adapters=None, loop=None, interval=60):
import logging
from cron.scheduler import tick as cron_tick
logger = logging.getLogger("cron.scheduler_provider")
logger.info("In-process cron scheduler started (interval=%ds)", interval)
while not stop_event.is_set():
try:
cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False)
except Exception as e:
logger.debug("Cron tick error: %s", e)
stop_event.wait(interval)

View file

@ -0,0 +1,196 @@
# Chronos managed-cron — agent ↔ NAS wire contract
**Status:** authoritative wire spec for the Chronos cron provider.
**Audience:** the NAS-side implementer of the `agent-cron` endpoints
(`nous-account-service`) and anyone debugging the managed-cron path.
Chronos lets a hosted Hermes gateway **scale to zero** while idle and still
fire cron jobs. Instead of an in-process 60-second ticker, the agent asks NAS
to arm exactly **one external one-shot per job at that job's real next-fire
time**. NAS calls the agent back at fire time over an authenticated webhook;
the agent runs the job and re-arms the next one-shot. Between fires the agent
process can be fully stopped — it wakes only on a genuine fire.
The external scheduler NAS uses to implement the one-shots is an **internal NAS
implementation detail**. The agent never talks to it, never holds its
credentials, and never names it. The agent only knows the three NAS endpoints
below.
```
create/update/pause/resume/remove a cron job (agent side)
ChronosCronScheduler.reconcile() ── agent computes next_run_at
│ POST {portal}/api/agent-cron/provision (auth: agent's Nous access token)
NAS arms a one-shot for fire_at ── NAS owns the scheduler + its creds
⏰ at fire_at
scheduler → POST {portal}/api/agent-cron/relay (auth: scheduler signature, NAS-verified)
NAS mints a short-lived agent-audience JWT (purpose=cron_fire)
│ POST {agent_callback_url}/api/cron/fire (auth: that JWT)
agent verifies the NAS JWT → store CAS claim → run_one_job → re-arm next one-shot
```
## Trust model (read this first)
| Hop | Who calls whom | Auth mechanism | Verified by |
|---|---|---|---|
| 1 | agent → NAS (`provision`/`cancel`/`list`) | the agent's existing **Nous Portal access token** (Bearer) | NAS (its normal agent-token path) |
| 2 | scheduler → NAS (`relay`) | the scheduler's request **signature** | NAS (the signature path it already has) |
| 3 | NAS → agent (`/api/cron/fire`) | a **short-lived NAS-minted JWT** (`aud=agent:{instance_id}`, `purpose=cron_fire`) | agent (PyJWT against NAS JWKS) |
Why NAS-mediated rather than scheduler→agent direct: the scheduler signs with
**NAS's** keys, which the agent does not (and should not) hold. The agent can
only verify a **NAS-minted** token — a trust path it already has. This keeps
all scheduler credentials inside NAS. (Full rationale: the plan's DQ-4.)
No new secret is introduced on the agent: hop 1 reuses the token the agent
already uses for the portal, and hop 3 reuses the NAS-JWT verification the agent
already performs.
---
## Endpoint 1 — `POST /api/agent-cron/provision` (agent → NAS)
Arm (or re-arm, idempotently) exactly one one-shot for a job.
- **Auth:** `Authorization: Bearer <agent Nous access token>`. NAS validates via
its normal agent-token path and scopes the row to the calling agent/org.
- **Request body:**
```json
{
"job_id": "ab12cd34",
"fire_at": "2026-06-18T12:34:56+00:00",
"agent_callback_url": "https://agent-xyz.fly.dev",
"dedup_key": "ab12cd34:2026-06-18T12:34:56+00:00"
}
```
- `fire_at` — ISO 8601, **agent-computed**. May be sub-minute in the future;
NAS must honor second-granularity (the agent owns the time, so there is no
1-minute scheduler floor).
- `agent_callback_url` — the agent's own publicly-reachable base URL. NAS
POSTs `{agent_callback_url}/api/cron/fire` at fire time.
- `dedup_key``"{job_id}:{fire_at}"`. NAS **upserts by `(agent_id, job_id)`**
so re-arming the same fire is idempotent (no duplicate one-shots). A new
`fire_at` for the same `job_id` replaces the prior arm.
- **Action:** arm one one-shot to fire at `fire_at`, destined for the NAS
**relay** route (Endpoint 3) — NOT the agent directly, so NAS stays in the
loop to mint the agent JWT. Persist `(agent_id, job_id, schedule_id,
agent_callback_url)`.
- **Response:** `200 {"schedule_id": "<opaque>"}`.
## Endpoint 2 — `POST /api/agent-cron/cancel` (agent → NAS)
- **Auth:** same as Endpoint 1.
- **Body:** `{"job_id": "ab12cd34"}`.
- **Action:** cancel the armed one-shot for `(agent_id, job_id)` and delete the
row. Idempotent — cancelling an unknown job is a 200 no-op.
- **Response:** `200 {"ok": true}`.
## Endpoint 3 — `POST /api/agent-cron/relay` (scheduler → NAS, the fire relay)
- **Auth:** the scheduler's request **signature**, verified by NAS with the
signature path it already has. This is the trust boundary for the fire — a
forged relay call must be rejected here.
- **Action:**
1. Look up `(agent_id, job_id) → agent_callback_url` from the persisted row.
2. Mint a **short-lived** JWT: `aud = "agent:{instance_id}"`,
`iss = {portal_url}`, `purpose = "cron_fire"`, small `exp` (≈60120s),
signed with NAS's normal asymmetric signing key (published via JWKS).
3. `POST {agent_callback_url}/api/cron/fire` with
`Authorization: Bearer <that JWT>` and body `{"job_id": "...", "fire_at": "..."}`.
4. Treat a non-2xx agent response as a **retryable** failure (let the
scheduler retry the relay). The agent's store CAS de-dupes a double fire,
so retries are safe.
- **Response to the scheduler:** 2xx once the agent POST is accepted (202), so
the scheduler does not retry a delivered fire.
---
## Inbound `POST /api/cron/fire` (NAS → agent) — agent side, already implemented
This is the agent endpoint NAS calls in Endpoint 3 step 3. Served by the
**dashboard app** (`hermes_cli/web_server.py`) — the agent's always-reachable
public HTTP surface on hosted deployments (the gateway may be idle/scaled down);
it is in `PUBLIC_API_PATHS` so the dashboard cookie gate lets the bearer-JWT
callback through to the verifier. (Also registered on the optional
`APIServerAdapter` for self-host API-server deployments.) The verifier is
`plugins/cron/chronos/verify.py`.
- **Auth:** `Authorization: Bearer <NAS-minted JWT>`. The agent verifies:
- signature against the NAS JWKS (`cron.chronos.nas_jwks_url`),
- `aud` == `cron.chronos.expected_audience` (this agent's
`agent:{instance_id}`),
- `iss` == `cron.chronos.portal_url`,
- `exp` / `nbf` (30s leeway),
- `purpose == "cron_fire"` — a general agent JWT (no/other purpose) is
rejected so it can't be replayed against this endpoint.
- **Body:** `{"job_id": "ab12cd34", "fire_at": "..."}` (only `job_id` is used).
- **Behavior:**
- invalid/missing/forged/expired/wrong-aud/wrong-purpose token → **401**, no
execution.
- missing `job_id`**400**.
- valid → **202 `{"status": "accepted", "job_id": "..."}`** immediately, and
the job runs in the background. 202-before-run means a long agent turn never
trips the relay's HTTP timeout.
- **At-most-once:** the agent claims the job with a store-level compare-and-set
(`claim_job_for_fire`) before running. A relay/scheduler retry that arrives
while the first fire is in flight (or after it completed) loses the claim and
does not double-run.
---
## At-most-once & re-arm semantics
- **Recurring (cron/interval):** on fire, the agent advances `next_run_at`
(under its store lock) as part of the claim, runs the job, then re-provisions
a one-shot for the new `next_run_at`. A duplicate relay for the old `fire_at`
finds the claim taken / time advanced and is dropped.
- **One-shot (`30m`, `+90s`, etc.):** fires once; `mark_job_run` marks it
completed. No re-arm.
- **`repeat.times = N`:** `mark_job_run` deletes the job at the limit, so
`get_job` returns `None` after the final fire → the agent does **not** re-arm
→ the schedule stops cleanly with no orphaned one-shot.
- **Multi-replica agents:** the store CAS makes the fire at-most-once across N
gateway replicas sharing one `HERMES_HOME` — exactly one replica runs each
fire.
## Reconcile (self-healing)
The agent reconciles desired (`jobs.json`) vs armed on:
- `start()` (gateway boot / wake),
- every successful job mutation (`on_jobs_changed`),
- piggybacked after each fire (re-arm).
Reconcile arms missing/changed-time jobs and cancels orphans. A missed
provision (transient NAS error) self-heals on the next reconcile. There is **no
periodic wake** of a sleeping agent — that would negate scale-to-zero.
## Config (agent side)
All non-secret (`cron.chronos.*` in `config.yaml`); the agent holds no scheduler
credentials. For hosted agents NAS sets these at provision time:
| key | meaning |
|---|---|
| `cron.provider` | `"chronos"` to activate (empty = built-in ticker) |
| `cron.chronos.portal_url` | NAS base URL (also the expected JWT `iss`) |
| `cron.chronos.callback_url` | the agent's own public base URL for NAS→agent fires |
| `cron.chronos.expected_audience` | this agent's JWT `aud` (`agent:{instance_id}`) |
| `cron.chronos.nas_jwks_url` | NAS JWKS for verifying the fire JWT |
If `callback_url` / `portal_url` is blank or the agent has no Nous login,
`is_available()` returns False and the resolver falls back to the built-in
in-process ticker — cron never loses its trigger.
## Escape hatch (not default)
The inbound `/api/cron/fire` verifier is pluggable (`get_fire_verifier()`). If
relay volume through NAS ever saturates, a direct scheduler→agent mode with a
per-job NAS-minted cron-key can replace the NAS-JWT verifier with **no change to
the webhook handler**. NAS-mediated (this contract) is the default.

View file

@ -717,6 +717,16 @@ except ImportError:
_cron_resume = None
_cron_trigger = None
def _notify_cron_provider_jobs_changed() -> None:
"""Tell the active cron scheduler provider the job set changed after a REST
mutation (no-op for the built-in). Best-effort never breaks the handler."""
try:
from cron.scheduler import _notify_provider_jobs_changed
_notify_provider_jobs_changed()
except Exception:
pass
# Defense-in-depth: mirror the agent-facing cronjob tool, which scans the
# user-supplied prompt for exfiltration/injection payloads at create/update
# time (tools/cronjob_tools.py). The REST cron endpoints are authenticated
@ -3212,6 +3222,7 @@ class APIServerAdapter(BasePlatformAdapter):
kwargs["repeat"] = repeat
job = _cron_create(**kwargs)
_notify_cron_provider_jobs_changed()
return web.json_response({"job": job})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
@ -3268,6 +3279,7 @@ class APIServerAdapter(BasePlatformAdapter):
job = _cron_update(job_id, sanitized)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
_notify_cron_provider_jobs_changed()
return web.json_response({"job": job})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
@ -3287,6 +3299,7 @@ class APIServerAdapter(BasePlatformAdapter):
success = _cron_remove(job_id)
if not success:
return web.json_response({"error": "Job not found"}, status=404)
_notify_cron_provider_jobs_changed()
return web.json_response({"ok": True})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
@ -3306,6 +3319,7 @@ class APIServerAdapter(BasePlatformAdapter):
job = _cron_pause(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
_notify_cron_provider_jobs_changed()
return web.json_response({"job": job})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
@ -3325,6 +3339,7 @@ class APIServerAdapter(BasePlatformAdapter):
job = _cron_resume(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
_notify_cron_provider_jobs_changed()
return web.json_response({"job": job})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
@ -3348,6 +3363,64 @@ class APIServerAdapter(BasePlatformAdapter):
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def _handle_cron_fire(self, request: "web.Request") -> "web.Response":
"""POST /api/cron/fire — Chronos managed-cron fire webhook (NAS → agent).
Authenticated by a NAS-minted JWT (verified via the pluggable
fire-verifier), NOT API_SERVER_KEY NAS holds no API server key, and
this is the only inbound that can trigger remote job execution, so it
gets its own purpose-scoped token check.
Returns 202 + runs the job in the background so a long agent turn never
trips NAS's HTTP timeout. The store CAS claim inside fire_due guards
against double-fire on a NAS/scheduler retry.
"""
from hermes_cli.config import cfg_get, load_config
from plugins.cron.chronos.verify import get_fire_verifier
auth = request.headers.get("Authorization", "")
token = auth[7:].strip() if auth.startswith("Bearer ") else ""
cfg = load_config()
claims = get_fire_verifier()(
token=token,
expected_audience=cfg_get(cfg, "cron", "chronos", "expected_audience", default=""),
jwks_or_key=cfg_get(cfg, "cron", "chronos", "nas_jwks_url", default="") or None,
issuer=cfg_get(cfg, "cron", "chronos", "portal_url", default="") or None,
)
if claims is None:
logger.warning(
"cron fire: rejected invalid token: %s",
self._request_audit_log_suffix(request),
)
return web.json_response({"error": "invalid fire token"}, status=401)
try:
body = await request.json()
except Exception:
body = {}
job_id = (body or {}).get("job_id")
if not job_id:
return web.json_response({"error": "missing job_id"}, status=400)
from cron.scheduler_provider import resolve_cron_scheduler
provider = resolve_cron_scheduler()
loop = asyncio.get_running_loop()
# Fire in the background (202 immediately). fire_due claims via the
# store CAS, so a retry while this is in flight is de-duped.
task = asyncio.create_task(
asyncio.to_thread(provider.fire_due, job_id, adapters=None, loop=loop)
)
try:
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
except (TypeError, AttributeError):
pass
return web.json_response({"status": "accepted", "job_id": job_id}, status=202)
# ------------------------------------------------------------------
# Output extraction helper
# ------------------------------------------------------------------
@ -4202,6 +4275,11 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
# Chronos managed-cron fire webhook (NAS → agent). Authenticated by a
# NAS-minted JWT (NOT API_SERVER_KEY), so it has its own auth path.
if _CRON_AVAILABLE:
self._app.router.add_post("/api/cron/fire", self._handle_cron_fire)
# Structured event streaming
self._app.router.add_post("/v1/runs", self._handle_runs)
self._app.router.add_get("/v1/runs/{run_id}", self._handle_get_run)

View file

@ -16934,21 +16934,20 @@ def _run_planned_stop_watcher(
stop_event.wait(poll_interval)
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
"""
Background thread that ticks the cron scheduler at a regular interval.
Runs inside the gateway process so cronjobs fire automatically without
needing a separate `hermes cron daemon` or system cron entry.
def _start_gateway_housekeeping(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
"""Background thread for gateway-only periodic chores (NOT cron).
When ``adapters`` and ``loop`` are provided, passes them through to the
cron delivery path so live adapters can be used for E2EE rooms.
Split out of the historical ``_start_cron_ticker`` so the cron *trigger*
can live behind the ``CronScheduler`` provider (built-in or external) while
these gateway-specific chores keep running independently of which provider
fires cron. An external scale-to-zero provider has no 60s loop at all, but
this housekeeping still wants its hourly cadence so it owns its own loop.
Also refreshes the channel directory every 5 minutes and prunes the
image/audio/document cache + expired ``hermes debug share`` pastes
once per hour.
Refreshes the channel directory every 5 minutes and prunes the
image/audio/document cache + expired ``hermes debug share`` pastes once per
hour, and polls the curator hourly (its inner gate enforces the real
weekly cadence).
"""
from cron.scheduler import tick as cron_tick
from gateway.platforms.base import cleanup_image_cache, cleanup_document_cache
from hermes_cli.debug import _sweep_expired_pastes
@ -16957,14 +16956,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
PASTE_SWEEP_EVERY = 60 # ticks — once per hour
CURATOR_EVERY = 60 # ticks — poll hourly (inner gate handles the real cadence)
logger.info("Cron ticker started (interval=%ds)", interval)
logger.info("Gateway housekeeping started (interval=%ds)", interval)
tick_count = 0
while not stop_event.is_set():
try:
cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False)
except Exception as e:
logger.debug("Cron tick error: %s", e)
tick_count += 1
if tick_count % CHANNEL_DIR_EVERY == 0 and adapters:
@ -16972,9 +16966,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
from gateway.channel_directory import build_channel_directory
if loop is not None:
# build_channel_directory is async (Slack web calls), and
# this ticker runs in a background thread. Schedule onto
# the gateway event loop and wait briefly for completion
# so refresh failures are still logged via the except.
# this runs in a background thread. Schedule onto the
# gateway event loop and wait briefly for completion so
# refresh failures are still logged via the except.
fut = safe_schedule_threadsafe(
build_channel_directory(adapters), loop,
logger=logger,
@ -17010,7 +17004,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
except Exception as e:
logger.debug("Paste sweep error: %s", e)
# Curator — piggy-back on the existing cron ticker so long-running
# Curator — piggy-back on the housekeeping loop so long-running
# gateways get weekly skill maintenance without needing restarts.
# maybe_run_curator() is internally gated by config.interval_hours
# (7 days by default), so CURATOR_EVERY is just the poll rate — the
@ -17026,7 +17020,22 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
logger.debug("Curator tick error: %s", e)
stop_event.wait(timeout=interval)
logger.info("Cron ticker stopped")
logger.info("Gateway housekeeping stopped")
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
"""DEPRECATED shim — preserved for backward compatibility.
The cron trigger now lives behind the ``CronScheduler`` provider
(``cron.scheduler_provider``); the gateway resolves a provider and runs its
``start()`` directly (see ``start_gateway``). This shim runs ONLY the
built-in in-process tick loop, exactly as before, for any external caller
or test that still references this symbol (e.g. hermes_cli/debug.py). It no
longer runs gateway housekeeping that moved to
``_start_gateway_housekeeping``.
"""
from cron.scheduler_provider import InProcessCronScheduler
InProcessCronScheduler().start(stop_event, adapters=adapters, loop=loop, interval=interval)
async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = False, verbosity: Optional[int] = 0) -> bool:
@ -17422,17 +17431,34 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
logger.error("Gateway exiting cleanly: %s", runner.exit_reason)
return True
# Start background cron ticker so scheduled jobs fire automatically.
# Pass the event loop so cron delivery can use live adapters (E2EE support).
# Start the background cron scheduler via the resolved provider so
# scheduled jobs fire automatically. The built-in provider is the
# historical in-process 60s ticker; an external provider (e.g. chronos)
# may arm a schedule and return. Pass the event loop so cron delivery can
# use live adapters (E2EE support).
from cron.scheduler_provider import resolve_cron_scheduler
cron_stop = threading.Event()
cron_provider = resolve_cron_scheduler()
cron_thread = threading.Thread(
target=_start_cron_ticker,
target=cron_provider.start,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
daemon=True,
name="cron-ticker",
name="cron-scheduler",
)
cron_thread.start()
# Gateway-only periodic housekeeping (channel dir, cache cleanup, paste
# sweep, curator) — runs independently of which cron provider is active.
# Shares cron_stop as the shutdown signal.
housekeeping_thread = threading.Thread(
target=_start_gateway_housekeeping,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
daemon=True,
name="gateway-housekeeping",
)
housekeeping_thread.start()
# Wait for shutdown
await runner.wait_for_shutdown()
@ -17442,9 +17468,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
logger.error("Gateway exiting with failure: %s", runner.exit_reason)
return False
# Stop cron ticker cleanly
# Stop cron scheduler + housekeeping cleanly
cron_stop.set()
try:
cron_provider.stop()
except Exception as e:
logger.debug("Cron provider stop() error: %s", e)
cron_thread.join(timeout=5)
housekeeping_thread.join(timeout=5)
# Stop the planned-stop watcher (daemon=True so this is belt-and-suspenders).
_planned_stop_watcher_stop.set()

View file

@ -2247,6 +2247,33 @@ DEFAULT_CONFIG = {
},
"cron": {
# Active cron SCHEDULER provider (Axis B — the trigger that decides
# WHEN a due job fires). Empty string = the built-in in-process 60s
# ticker (default). Name an installed provider (plugins/cron/<name>/ or
# $HERMES_HOME/plugins/<name>/) to relocate the trigger — e.g. "chronos",
# the NAS-mediated managed-cron provider for scale-to-zero deployments.
# An unknown or unavailable provider falls back to the built-in, so cron
# never loses its trigger.
"provider": "",
# Chronos (NAS-mediated managed cron) settings. Only consulted when
# provider == "chronos". All non-secret (URLs + the JWT audience): the
# agent holds NO external-scheduler credentials. For hosted agents, NAS
# sets these at provision time. The outbound provision call reuses the
# agent's existing Nous Portal token — there is no token key here.
"chronos": {
# NAS / portal base URL the agent calls to arm/cancel one-shots
# and that mints the inbound fire JWT (used as the expected issuer).
"portal_url": "https://portal.nousresearch.com",
# The agent's OWN publicly-reachable base URL for NAS→agent fires
# (NAS POSTs {callback_url}/api/cron/fire). Empty → Chronos is
# unavailable and the resolver falls back to the built-in ticker.
"callback_url": "",
# This agent's expected JWT audience (e.g. "agent:{instance_id}").
"expected_audience": "",
# NAS JWKS URL for verifying the inbound fire JWT's signature.
# Empty → the fire endpoint refuses all tokens (no unsigned decode).
"nas_jwks_url": "",
},
# Wrap delivered cron responses with a header (task name) and footer
# ("The agent cannot see this message"). Set to false for clean output.
"wrap_response": True,

View file

@ -46,4 +46,10 @@ PUBLIC_API_PATHS: frozenset[str] = frozenset({
# Read-only theme + plugin manifests for the dashboard skin engine.
"/api/dashboard/themes",
"/api/dashboard/plugins",
# Chronos managed-cron fire webhook (NAS -> agent). NOT cookie-gated: it
# carries its own short-lived NAS-minted JWT (purpose=cron_fire), which the
# handler verifies as the real auth. Must bypass the dashboard auth gate so
# the NAS relay's bearer-only callback reaches the verifier instead of a
# 401 no_cookie. The JWT — not this allowlist — is the security boundary.
"/api/cron/fire",
})

View file

@ -124,23 +124,20 @@ def _start_desktop_cron_ticker(stop_event: "threading.Event", interval: int = 60
The scheduler tick loop normally lives in ``hermes gateway run`` but the
desktop app spawns a ``hermes dashboard`` backend, not a gateway, so a cron
a user creates in the app would never fire. We run a minimal ticker here
(no live adapters; delivery falls back to the per-platform send path).
a user creates in the app would never fire. We run the resolved cron
scheduler provider here (no live adapters; delivery falls back to the
per-platform send path).
Cross-process safe: ``cron.scheduler.tick`` takes the ``cron/.tick.lock``
file lock, so this never double-fires alongside a real gateway on the same
HERMES_HOME whichever process grabs the lock first wins the tick.
Cross-process safe: the built-in provider's ``cron.scheduler.tick`` takes
the ``cron/.tick.lock`` file lock, so this never double-fires alongside a
real gateway on the same HERMES_HOME whichever process grabs the lock
first wins the tick.
"""
from cron.scheduler import tick as cron_tick
from cron.scheduler_provider import resolve_cron_scheduler
_log.info("Desktop cron ticker started (interval=%ds)", interval)
# Tick once up front (catches jobs due at launch), then on the interval.
while not stop_event.is_set():
try:
cron_tick(verbose=False, sync=False)
except Exception as e:
_log.debug("Desktop cron tick error: %s", e)
stop_event.wait(interval)
provider = resolve_cron_scheduler()
_log.info("Desktop cron scheduler started (provider=%s, interval=%ds)", provider.name, interval)
provider.start(stop_event, interval=interval)
@asynccontextmanager
@ -7797,6 +7794,93 @@ async def delete_cron_job(job_id: str, profile: Optional[str] = None):
return {"ok": True}
def _fire_cron_job_for_profile(profile: str, job_id: str) -> bool:
"""Run ONE due cron job end-to-end for ``profile`` via the resolved
scheduler provider's ``fire_due`` (store CAS claim + ``run_one_job``).
Retargets the ``cron.jobs`` module globals to the profile's cron dir under
the shared lock same mechanism as ``_call_cron_for_profile`` so the
claim and the run operate on the right profile's ``jobs.json``. Runs with
no live adapters; delivery falls back to the per-platform send path (the
dashboard process has no gateway adapter handles, exactly like the desktop
cron path above).
"""
_profile_name, home = _cron_profile_home(profile)
with _CRON_PROFILE_LOCK:
from cron import jobs as cron_jobs
from cron.scheduler_provider import resolve_cron_scheduler
old_cron_dir = cron_jobs.CRON_DIR
old_jobs_file = cron_jobs.JOBS_FILE
old_output_dir = cron_jobs.OUTPUT_DIR
cron_jobs.CRON_DIR = home / "cron"
cron_jobs.JOBS_FILE = cron_jobs.CRON_DIR / "jobs.json"
cron_jobs.OUTPUT_DIR = cron_jobs.CRON_DIR / "output"
try:
provider = resolve_cron_scheduler()
return bool(provider.fire_due(job_id, adapters=None, loop=None))
finally:
cron_jobs.CRON_DIR = old_cron_dir
cron_jobs.JOBS_FILE = old_jobs_file
cron_jobs.OUTPUT_DIR = old_output_dir
@app.post("/api/cron/fire")
async def cron_fire_webhook(request: Request):
"""Chronos managed-cron fire webhook (NAS -> agent).
Authenticated by a short-lived NAS-minted JWT (verified by the pluggable
Chronos fire-verifier), NOT the dashboard session cookie so this path is
in ``PUBLIC_API_PATHS`` to bypass the dashboard auth gate, and the JWT is
the real gate. This is the inbound half of scale-to-zero managed cron: NAS
POSTs here at fire time, the agent verifies, claims the job (store CAS, so
at-most-once across replicas / on a NAS retry), runs it, and re-arms the
next one-shot.
Lives on the dashboard app (not the api_server adapter) because the
dashboard is the agent's always-reachable public HTTP surface on hosted
deployments; the gateway may be idle/scaled down.
Returns 202 immediately and runs the job in the background so a long agent
turn never trips NAS's HTTP timeout.
"""
from plugins.cron.chronos.verify import get_fire_verifier
auth = request.headers.get("Authorization", "")
token = auth[7:].strip() if auth.startswith("Bearer ") else ""
cfg = load_config()
claims = get_fire_verifier()(
token=token,
expected_audience=cfg_get(cfg, "cron", "chronos", "expected_audience", default=""),
jwks_or_key=cfg_get(cfg, "cron", "chronos", "nas_jwks_url", default="") or None,
issuer=cfg_get(cfg, "cron", "chronos", "portal_url", default="") or None,
)
if claims is None:
return JSONResponse({"error": "invalid fire token"}, status_code=401)
try:
body = await request.json()
except Exception:
body = {}
job_id = (body or {}).get("job_id") if isinstance(body, dict) else None
if not job_id:
return JSONResponse({"error": "missing job_id"}, status_code=400)
profile = _find_cron_job_profile(job_id)
if not profile:
# Job is gone (cancelled / completed) — nothing to fire. 200 so NAS
# does not retry a fire that is intentionally absent.
return JSONResponse({"status": "gone", "job_id": job_id}, status_code=200)
# Run in the background; the store CAS claim inside fire_due de-dupes a
# NAS/scheduler retry that arrives while this is in flight.
asyncio.create_task(
asyncio.to_thread(_fire_cron_job_for_profile, profile, job_id)
)
return JSONResponse({"status": "accepted", "job_id": job_id}, status_code=202)
# ---------------------------------------------------------------------------
# Automation Blueprints — parameterized automation blueprints. The dashboard renders the
# slot schema as a form; submitting instantiates a real cron job via the same

344
plugins/cron/__init__.py Normal file
View file

@ -0,0 +1,344 @@
"""Cron scheduler provider plugin discovery.
Scans two directories for cron scheduler provider plugins:
1. Bundled providers: ``plugins/cron/<name>/`` (shipped with hermes-agent)
2. User-installed providers: ``$HERMES_HOME/plugins/<name>/``
Each subdirectory must contain ``__init__.py`` with a class implementing the
``CronScheduler`` ABC (``cron/scheduler_provider.py``). On name collisions,
bundled providers take precedence.
This is a near-verbatim clone of ``plugins/memory/__init__.py`` the same
discovery/loader machinery, retargeted at ``CronScheduler``. The built-in
``InProcessCronScheduler`` is NOT discovered here: it is core (lives in
``cron/scheduler_provider.py``) so the fallback can never be accidentally
removed. Only NON-default providers (e.g. "chronos") live under this directory.
Only ONE provider can be active at a time, selected via ``cron.provider`` in
config.yaml (empty = built-in). See ``cron.scheduler_provider.resolve_cron_scheduler``.
Usage:
from plugins.cron import discover_cron_schedulers, load_cron_scheduler
available = discover_cron_schedulers() # [(name, desc, available), ...]
provider = load_cron_scheduler("chronos") # CronScheduler instance
"""
from __future__ import annotations
import importlib
import importlib.machinery
import importlib.util
import logging
import sys
from pathlib import Path
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
_CRON_PLUGINS_DIR = Path(__file__).parent
# Synthetic parent package for user-installed providers, so they don't
# collide with bundled providers in sys.modules.
_USER_NAMESPACE = "_hermes_user_cron"
def _register_synthetic_package(name: str, search_locations: List[str]) -> None:
"""Register an empty package shell in sys.modules.
User-installed providers import as ``_hermes_user_cron.<name>``, a dotted
name whose parents exist nowhere on disk. Unless those parents are present
in ``sys.modules``, any relative import inside the plugin
(``from . import config``) fails with
``ModuleNotFoundError: No module named '_hermes_user_cron'`` the same
reason the loader already registers ``plugins`` and ``plugins.cron`` for
bundled providers.
"""
if name in sys.modules:
return
spec = importlib.machinery.ModuleSpec(name, None, is_package=True)
spec.submodule_search_locations = search_locations
sys.modules[name] = importlib.util.module_from_spec(spec)
# ---------------------------------------------------------------------------
# Directory helpers
# ---------------------------------------------------------------------------
def _get_user_plugins_dir() -> Optional[Path]:
"""Return ``$HERMES_HOME/plugins/`` or None if unavailable."""
try:
from hermes_constants import get_hermes_home
d = get_hermes_home() / "plugins"
return d if d.is_dir() else None
except Exception:
return None
def _is_cron_provider_dir(path: Path) -> bool:
"""Heuristic: does *path* look like a cron scheduler provider plugin?
Checks for ``register_cron_scheduler`` or ``CronScheduler`` in the
``__init__.py`` source. Cheap text scan no import needed.
"""
init_file = path / "__init__.py"
if not init_file.exists():
return False
try:
source = init_file.read_text(errors="replace")[:8192]
return "register_cron_scheduler" in source or "CronScheduler" in source
except Exception:
return False
def _iter_provider_dirs() -> List[Tuple[str, Path]]:
"""Yield ``(name, path)`` for all discovered provider directories.
Scans bundled first, then user-installed. Bundled takes precedence on
name collisions (first-seen wins via ``seen`` set).
"""
seen: set = set()
dirs: List[Tuple[str, Path]] = []
# 1. Bundled providers (plugins/cron/<name>/)
if _CRON_PLUGINS_DIR.is_dir():
for child in sorted(_CRON_PLUGINS_DIR.iterdir()):
if not child.is_dir() or child.name.startswith(("_", ".")):
continue
if not (child / "__init__.py").exists():
continue
seen.add(child.name)
dirs.append((child.name, child))
# 2. User-installed providers ($HERMES_HOME/plugins/<name>/)
user_dir = _get_user_plugins_dir()
if user_dir:
for child in sorted(user_dir.iterdir()):
if not child.is_dir() or child.name.startswith(("_", ".")):
continue
if child.name in seen:
continue # bundled takes precedence
if not _is_cron_provider_dir(child):
continue # skip non-cron plugins
dirs.append((child.name, child))
return dirs
def find_provider_dir(name: str) -> Optional[Path]:
"""Resolve a provider name to its directory.
Checks bundled first, then user-installed.
"""
# Bundled
bundled = _CRON_PLUGINS_DIR / name
if bundled.is_dir() and (bundled / "__init__.py").exists():
return bundled
# User-installed
user_dir = _get_user_plugins_dir()
if user_dir:
user = user_dir / name
if user.is_dir() and _is_cron_provider_dir(user):
return user
return None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def discover_cron_schedulers() -> List[Tuple[str, str, bool]]:
"""Scan bundled and user-installed directories for available providers.
Returns list of (name, description, is_available) tuples. May be empty
the built-in is core, not discovered here, so a fresh checkout with no
bundled non-default provider returns []. Bundled providers take precedence
on name collisions.
"""
results = []
for name, child in _iter_provider_dirs():
# Read description from plugin.yaml if available
desc = ""
yaml_file = child / "plugin.yaml"
if yaml_file.exists():
try:
import yaml
with open(yaml_file, encoding="utf-8-sig") as f:
meta = yaml.safe_load(f) or {}
desc = meta.get("description", "")
except Exception:
pass
# Quick availability check — try loading and calling is_available()
available = True
try:
provider = _load_provider_from_dir(child)
if provider:
available = provider.is_available()
else:
available = False
except Exception:
available = False
results.append((name, desc, available))
return results
def load_cron_scheduler(name: str) -> Optional["CronScheduler"]: # noqa: F821
"""Load and return a CronScheduler instance by name.
Checks both bundled (``plugins/cron/<name>/``) and user-installed
(``$HERMES_HOME/plugins/<name>/``) directories. Bundled takes precedence
on name collisions.
Returns None if the provider is not found or fails to load.
"""
provider_dir = find_provider_dir(name)
if not provider_dir:
logger.debug("Cron provider '%s' not found in bundled or user plugins", name)
return None
try:
provider = _load_provider_from_dir(provider_dir)
if provider:
return provider
logger.warning("Cron provider '%s' loaded but no provider instance found", name)
return None
except Exception as e:
logger.warning("Failed to load cron provider '%s': %s", name, e)
return None
def _load_provider_from_dir(provider_dir: Path) -> Optional["CronScheduler"]: # noqa: F821
"""Import a provider module and extract the CronScheduler instance.
The module must have either:
- A register(ctx) function (plugin-style) we simulate a ctx
- A top-level class that extends CronScheduler we instantiate it
"""
name = provider_dir.name
# Use a separate namespace for user-installed plugins so they don't
# collide with bundled providers in sys.modules.
_is_bundled = _CRON_PLUGINS_DIR in provider_dir.parents or provider_dir.parent == _CRON_PLUGINS_DIR
module_name = f"plugins.cron.{name}" if _is_bundled else f"{_USER_NAMESPACE}.{name}"
init_file = provider_dir / "__init__.py"
if not init_file.exists():
return None
# Check if already loaded. A synthetic package shell has no __file__;
# only reuse modules that were actually loaded from disk.
cached = sys.modules.get(module_name)
if cached is not None and getattr(cached, "__file__", None):
mod = cached
else:
# Ensure the parent packages are registered (for relative imports)
for parent in ("plugins", "plugins.cron"):
if parent not in sys.modules:
parent_path = Path(__file__).parent
if parent == "plugins":
parent_path = parent_path.parent
parent_init = parent_path / "__init__.py"
if parent_init.exists():
spec = importlib.util.spec_from_file_location(
parent, str(parent_init),
submodule_search_locations=[str(parent_path)]
)
if spec:
parent_mod = importlib.util.module_from_spec(spec)
sys.modules[parent] = parent_mod
try:
spec.loader.exec_module(parent_mod)
except Exception:
pass
# User-installed plugins need their synthetic parent registered the
# same way, or relative imports inside the plugin cannot resolve.
if not _is_bundled:
_register_synthetic_package(_USER_NAMESPACE, [])
# Now load the provider module
spec = importlib.util.spec_from_file_location(
module_name, str(init_file),
submodule_search_locations=[str(provider_dir)]
)
if not spec:
return None
mod = importlib.util.module_from_spec(spec)
sys.modules[module_name] = mod
# Register submodules so relative imports work
# e.g., "from ._nas_client import NasCronClient" in the chronos plugin
for sub_file in provider_dir.glob("*.py"):
if sub_file.name == "__init__.py":
continue
sub_name = sub_file.stem
full_sub_name = f"{module_name}.{sub_name}"
if full_sub_name not in sys.modules:
sub_spec = importlib.util.spec_from_file_location(
full_sub_name, str(sub_file)
)
if sub_spec:
sub_mod = importlib.util.module_from_spec(sub_spec)
sys.modules[full_sub_name] = sub_mod
try:
sub_spec.loader.exec_module(sub_mod)
except Exception as e:
logger.debug("Failed to load submodule %s: %s", full_sub_name, e)
try:
spec.loader.exec_module(mod)
except Exception as e:
logger.debug("Failed to exec_module %s: %s", module_name, e)
sys.modules.pop(module_name, None)
return None
# Try register(ctx) pattern first (how our plugins are written)
if hasattr(mod, "register"):
collector = _ProviderCollector()
try:
mod.register(collector)
if collector.provider:
return collector.provider
except Exception as e:
logger.debug("register() failed for %s: %s", name, e)
# Fallback: find a CronScheduler subclass and instantiate it
from cron.scheduler_provider import CronScheduler
for attr_name in dir(mod):
attr = getattr(mod, attr_name, None)
if (isinstance(attr, type) and issubclass(attr, CronScheduler)
and attr is not CronScheduler):
try:
return attr()
except Exception:
pass
return None
class _ProviderCollector:
"""Fake plugin context that captures register_cron_scheduler calls."""
def __init__(self):
self.provider = None
def register_cron_scheduler(self, provider):
self.provider = provider
# No-op for other registration methods
def register_tool(self, *args, **kwargs):
pass
def register_hook(self, *args, **kwargs):
pass
def register_memory_provider(self, *args, **kwargs):
pass
def register_cli_command(self, *args, **kwargs):
pass

View file

@ -0,0 +1,241 @@
"""Chronos — NAS-mediated managed cron provider (scale-to-zero).
Chronos (the Greek god of time, alongside Hermes) is the first non-default
``CronScheduler``. It lets a hosted gateway scale to zero while idle and still
fire cron jobs: instead of a 60s in-process ticker, it asks NAS to arm exactly
one external one-shot per job at that job's real next-fire time. NAS calls the
agent back at fire time over an authenticated webhook (``/api/cron/fire``); the
agent runs the job via the shared ``run_one_job`` body and re-arms the next
one-shot.
The external scheduler NAS uses is an internal NAS implementation detail
Chronos names no vendor, holds no scheduler credentials, and speaks only to
NAS's ``agent-cron`` endpoints with the agent's existing Nous token.
Design constraints (see the plan's DQ-1):
- start() arms all enabled jobs and RETURNS; it never blocks and never spawns
a periodic wake. Between fires the machine is truly at zero.
- reconcile runs only on a warm process (start / on_jobs_changed / piggybacked
on a fire), never as a periodic wake of a sleeping machine.
Inert unless ``cron.provider: chronos``. ``resolve_cron_scheduler`` falls back
to the built-in if Chronos is unavailable, so cron never loses its trigger.
Wire contract: ``docs/chronos-managed-cron-contract.md``.
"""
from __future__ import annotations
import logging
import threading
from typing import Any, Dict, Optional
from cron.scheduler_provider import CronScheduler
logger = logging.getLogger("cron.chronos")
def _cfg(*keys: str, default: Any = "") -> Any:
"""Read a cron.chronos.* config value (no network)."""
try:
from hermes_cli.config import cfg_get, load_config
return cfg_get(load_config(), *keys, default=default)
except Exception:
return default
class ChronosCronScheduler(CronScheduler):
"""NAS-mediated external cron provider."""
def __init__(self) -> None:
# In-memory map of job_id → fire_at we've asked NAS to arm. Best-effort
# cache; reconcile rebuilds desired state from jobs.json, so a cold
# process simply re-arms (idempotent via dedup_key).
self._armed: Dict[str, str] = {}
self._lock = threading.Lock()
self._client = None # lazily constructed (no network in is_available)
# -- identity / availability -----------------------------------------
@property
def name(self) -> str:
return "chronos"
def is_available(self) -> bool:
"""Config presence only — NO network.
Chronos needs a portal base URL, the agent's own publicly-reachable
callback URL (for NASagent fires), and a usable Nous token (the agent
is logged into the portal). If any is missing, resolve_cron_scheduler
falls back to the built-in ticker.
"""
if not (_cfg("cron", "chronos", "portal_url") and _cfg("cron", "chronos", "callback_url")):
return False
return self._have_nous_token()
def _have_nous_token(self) -> bool:
"""True if the agent has a Nous Portal login (no network call).
Checks the stored auth state for a Nous access token does NOT refresh
or hit the network (is_available must stay offline). The actual
refresh-aware token is resolved lazily at provision time.
"""
try:
from hermes_cli.auth import get_provider_auth_state
state = get_provider_auth_state("nous") or {}
return bool(state.get("access_token"))
except Exception:
return False
# -- client -----------------------------------------------------------
def _get_client(self):
if self._client is None:
from ._nas_client import NasCronClient
self._client = NasCronClient(_cfg("cron", "chronos", "portal_url"))
return self._client
def _callback_url(self) -> str:
return str(_cfg("cron", "chronos", "callback_url") or "")
# -- lifecycle --------------------------------------------------------
def start(self, stop_event, *, adapters=None, loop=None, interval=60):
"""Arm all enabled jobs via NAS, then RETURN immediately.
Does NOT block and does NOT spawn a 60s wake (DQ-1) that is the whole
point of scale-to-zero. The machine wakes only on a NASagent fire.
"""
try:
self.reconcile()
except Exception as e:
logger.warning("Chronos start() reconcile failed: %s", e)
# Intentionally return — no loop, no periodic wake.
def stop(self) -> None:
return None
def on_jobs_changed(self) -> None:
"""A job was created/updated/removed/paused/resumed — reconcile the NAS
registry so the affected one-shot is (re-)armed or cancelled."""
try:
self.reconcile()
except Exception as e:
logger.debug("Chronos on_jobs_changed reconcile failed: %s", e)
# -- arming -----------------------------------------------------------
def _arm_one_shot(self, job: Dict[str, Any]) -> None:
"""Ask NAS to arm exactly one one-shot at the job's next_run_at.
The agent computes the time; NAS+its scheduler are the dumb executor.
Idempotent per (job_id, fire_at) via dedup_key, so re-arming the same
fire is a no-op NAS-side.
"""
job_id = job["id"]
fire_at = job.get("next_run_at")
if not fire_at:
return
dedup_key = f"{job_id}:{fire_at}"
self._get_client().provision(
job_id=job_id,
fire_at=fire_at,
agent_callback_url=self._callback_url(),
dedup_key=dedup_key,
)
with self._lock:
self._armed[job_id] = fire_at
def _cancel(self, job_id: str) -> None:
try:
self._get_client().cancel(job_id=job_id)
finally:
with self._lock:
self._armed.pop(job_id, None)
def _list_armed(self) -> Dict[str, str]:
"""Observed armed one-shots: job_id → fire_at.
Prefer the in-memory map (warm process); on a cold/empty map, ask NAS
(best-effort). If NAS list fails, return what we have reconcile then
re-arms desired jobs idempotently.
"""
with self._lock:
if self._armed:
return dict(self._armed)
try:
observed = {
item["job_id"]: item.get("fire_at", "")
for item in self._get_client().list_armed()
if item.get("job_id")
}
with self._lock:
self._armed.update(observed)
return observed
except Exception as e:
logger.debug("Chronos _list_armed failed (will re-arm idempotently): %s", e)
return {}
# -- reconcile --------------------------------------------------------
def reconcile(self) -> None:
"""Converge the NAS-armed one-shots toward jobs.json (desired state):
arm missing / re-arm changed-time, cancel orphaned."""
from cron.jobs import load_jobs
desired: Dict[str, str] = {
j["id"]: j["next_run_at"]
for j in load_jobs()
if j.get("enabled") and j.get("next_run_at") and j.get("state") != "paused"
}
observed = self._list_armed()
# Arm missing or changed-time.
for job_id, fire_at in desired.items():
if observed.get(job_id) != fire_at:
# Re-fetch the full job dict to arm (need the whole record).
from cron.jobs import get_job
job = get_job(job_id)
if job:
try:
self._arm_one_shot(job)
except Exception as e:
logger.warning("Chronos failed to arm job %s: %s", job_id, e)
# Cancel orphans (armed but no longer desired).
for job_id in list(observed.keys()):
if job_id not in desired:
try:
self._cancel(job_id)
except Exception as e:
logger.warning("Chronos failed to cancel orphan %s: %s", job_id, e)
# -- fire -------------------------------------------------------------
def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool:
"""Run the due job (claim + run_one_job via the ABC default), then
re-arm the NEXT one-shot through NAS.
Re-arm happens AFTER the run so next_run_at reflects the completed fire.
If the job is gone (one-shot completed / repeat-N exhausted), get_job
returns None nothing to re-arm (the schedule naturally stops).
"""
ran = super().fire_due(job_id, adapters=adapters, loop=loop)
if ran:
from cron.jobs import get_job
job = get_job(job_id)
if job and job.get("enabled") and job.get("next_run_at"):
try:
self._arm_one_shot(job)
except Exception as e:
logger.warning("Chronos failed to re-arm job %s after fire: %s", job_id, e)
return ran
def register(ctx) -> None:
"""Plugin entrypoint — register the Chronos provider with the loader.
Mirrors the memory-plugin shape; plugins/cron discovery calls this and
collects the provider via register_cron_scheduler.
"""
ctx.register_cron_scheduler(ChronosCronScheduler())

View file

@ -0,0 +1,123 @@
"""Thin HTTP client for the agent → NAS ``agent-cron`` endpoints (Chronos).
The Chronos provider speaks ONLY to NAS it names no scheduler vendor and
holds no scheduler credentials. NAS owns the external scheduler (an internal
implementation detail) and that scheduler's account; the agent just asks NAS to
"arm a one-shot at time T" / "cancel" / "list", authenticated with the agent's
existing Nous Portal access token (the same token it already uses to call the
portal no new secret).
Wire contract: ``docs/chronos-managed-cron-contract.md``.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger("cron.chronos")
# Endpoint paths under the portal base URL.
_PROVISION_PATH = "/api/agent-cron/provision"
_CANCEL_PATH = "/api/agent-cron/cancel"
_LIST_PATH = "/api/agent-cron/list"
class NasCronClientError(RuntimeError):
"""Raised when a NAS agent-cron call fails (non-2xx or transport error)."""
class NasCronClient:
"""Minimal client for the agent→NAS provision/cancel/list endpoints.
Uses the agent's refresh-aware Nous access token for auth. No scheduler
vendor, no scheduler creds NAS hides all of that behind these three calls.
"""
def __init__(self, portal_url: str, *, timeout_seconds: float = 15.0) -> None:
self.portal_url = portal_url.rstrip("/")
self.timeout_seconds = timeout_seconds
# -- auth -------------------------------------------------------------
def _access_token(self) -> str:
"""The agent's existing Nous Portal access token (refresh-aware)."""
from hermes_cli.auth import resolve_nous_access_token
return resolve_nous_access_token()
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self._access_token()}",
"Content-Type": "application/json",
}
# -- HTTP -------------------------------------------------------------
def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]:
import requests # lazy: agent already depends on requests
url = f"{self.portal_url}{path}"
try:
resp = requests.post(
url, json=body, headers=self._headers(), timeout=self.timeout_seconds
)
except Exception as e:
raise NasCronClientError(f"POST {path} failed: {e}") from e
if resp.status_code // 100 != 2:
raise NasCronClientError(
f"POST {path} returned {resp.status_code}: {resp.text[:200]}"
)
try:
return resp.json() if resp.content else {}
except Exception:
return {}
def _get(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]:
import requests
url = f"{self.portal_url}{path}"
try:
resp = requests.get(
url, params=params, headers=self._headers(), timeout=self.timeout_seconds
)
except Exception as e:
raise NasCronClientError(f"GET {path} failed: {e}") from e
if resp.status_code // 100 != 2:
raise NasCronClientError(
f"GET {path} returned {resp.status_code}: {resp.text[:200]}"
)
try:
return resp.json() if resp.content else {}
except Exception:
return {}
# -- endpoints --------------------------------------------------------
def provision(self, *, job_id: str, fire_at: str, agent_callback_url: str,
dedup_key: str) -> Dict[str, Any]:
"""Ask NAS to arm a one-shot for ``job_id`` at ``fire_at`` (ISO 8601).
``dedup_key`` (``{job_id}:{fire_at}``) makes re-arming the same fire
idempotent NAS-side. Returns the NAS response (e.g. ``{schedule_id}``).
"""
return self._post(_PROVISION_PATH, {
"job_id": job_id,
"fire_at": fire_at,
"agent_callback_url": agent_callback_url,
"dedup_key": dedup_key,
})
def cancel(self, *, job_id: str) -> Dict[str, Any]:
"""Ask NAS to cancel any armed one-shot for ``job_id``."""
return self._post(_CANCEL_PATH, {"job_id": job_id})
def list_armed(self) -> List[Dict[str, Any]]:
"""List the one-shots NAS currently has armed for this agent.
Returns a list of ``{job_id, fire_at, schedule_id}``. Best-effort: used
by reconcile to find orphaned arms on a cold process; on error the
caller falls back to idempotent re-arm of all desired jobs.
"""
data = self._get(_LIST_PATH, {})
items = data.get("armed") if isinstance(data, dict) else None
return items if isinstance(items, list) else []

View file

@ -0,0 +1,9 @@
name: chronos
description: >-
Chronos — NAS-mediated managed cron provider for scale-to-zero hosted agents.
Delegates the "wake me at time T" trigger to Nous infrastructure so an idle
gateway can scale to zero and still fire cron jobs. The agent computes each
job's next-fire time and asks NAS to arm a one-shot; NAS calls the agent back
at fire time over an authenticated webhook. Inert unless cron.provider=chronos.
version: 1.0.0
author: Nous Research

View file

@ -0,0 +1,103 @@
"""Inbound cron-fire token verification for Chronos (Phase 4E.1).
When NAS relays an external scheduler fire to the agent, it POSTs
``/api/cron/fire`` with a short-lived NAS-minted JWT. This module verifies that
JWT before any job runs the security boundary for remotely-triggered job
execution.
We verify a NAS-minted JWT (the trust path the agent already has) rather than
let an external scheduler call the agent directly: the scheduler signs with
NAS's keys, which the agent doesn't (and shouldn't) hold. See the plan's DQ-4.
The verifier is pluggable (``get_fire_verifier``) so the escape-hatch mode
(direct per-job cron-key) can swap in later with no handler change.
Crypto is delegated to PyJWT (already a declared dependency) we do NOT
hand-roll JWT verification.
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Dict, Optional
logger = logging.getLogger("cron.chronos.verify")
# The purpose claim that scopes a token to the fire endpoint. A general agent
# JWT (without this claim) must NOT be replayable against /api/cron/fire.
_FIRE_PURPOSE = "cron_fire"
def verify_nas_fire_token(
*,
token: str,
expected_audience: str,
jwks_or_key: Optional[str] = None,
issuer: Optional[str] = None,
leeway_seconds: int = 30,
) -> Optional[Dict[str, Any]]:
"""Verify a NAS-minted cron-fire JWT. Return decoded claims, or None.
Checks (all must pass):
- signature against the NAS JWKS (``jwks_or_key`` is a JWKS URL) RS256
family; symmetric secrets are rejected (NAS signs asymmetrically).
- ``aud`` == ``expected_audience`` (this agent: ``agent:{instance_id}``).
- ``exp`` / ``nbf`` within ``leeway_seconds``.
- ``iss`` == ``issuer`` when an issuer is configured.
- ``purpose`` == ``"cron_fire"`` so a general agent JWT can't be
replayed against the fire endpoint.
Returns None (never raises) on any failure, so the handler can answer 401
without leaking which check failed.
"""
if not token or not expected_audience:
return None
if not jwks_or_key:
# No verification key configured → cannot verify → refuse. We never
# fall back to unsigned decode for a security boundary.
logger.warning("cron fire: no JWKS/key configured; refusing token")
return None
try:
import jwt
from jwt import PyJWKClient
# Resolve the signing key from the JWKS endpoint by the token's kid.
signing_key = None
if jwks_or_key.startswith("http://") or jwks_or_key.startswith("https://"):
jwk_client = PyJWKClient(jwks_or_key)
signing_key = jwk_client.get_signing_key_from_jwt(token).key
else:
# A PEM public key passed inline (test / pinned-key deployments).
signing_key = jwks_or_key
options = {"require": ["exp", "aud"]}
decode_kwargs: Dict[str, Any] = dict(
algorithms=["RS256", "RS384", "RS512", "ES256", "ES384"],
audience=expected_audience,
leeway=leeway_seconds,
options=options,
)
if issuer:
decode_kwargs["issuer"] = issuer
claims = jwt.decode(token, signing_key, **decode_kwargs)
except Exception as e:
logger.warning("cron fire: token verification failed: %s", e)
return None
if claims.get("purpose") != _FIRE_PURPOSE:
logger.warning("cron fire: token missing/!=%s purpose claim", _FIRE_PURPOSE)
return None
return claims
def get_fire_verifier() -> Callable[..., Optional[Dict[str, Any]]]:
"""Return the active inbound-fire verifier.
Default = the NAS-JWT verifier. The DQ-4 escape hatch (direct per-job
cron-key) would return a cron-key verifier here instead, selected by config
so the webhook handler never changes when the auth mode is swapped.
"""
return verify_nas_fire_token

View file

@ -0,0 +1,84 @@
"""Tests for the store-level CAS fire claim (Phase 4C).
`claim_job_for_fire` gives multi-machine at-most-once semantics when an external
scheduler (Chronos) fires a job: across N gateway replicas, exactly ONE wins the
claim for a given fire. Single-machine deployments always win (unaffected).
These exercise the real store against a temp HERMES_HOME (no mocks) per the
E2E-over-mocks discipline for file-touching code.
"""
import pytest
@pytest.fixture
def temp_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME so jobs.json doesn't touch the real store."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# cron.jobs caches no home at import; get_hermes_home() reads the env live.
yield tmp_path
def test_claim_succeeds_once_then_blocks(temp_home):
"""First claim for a fire wins; a second claim for the same fire loses, and
next_run_at is advanced (a re-delivery for the old time can't re-fire)."""
from cron.jobs import create_job, claim_job_for_fire, get_job
job = create_job(prompt="x", schedule="every 5m", name="t")
jid = job["id"]
before = get_job(jid)["next_run_at"]
assert claim_job_for_fire(jid) is True
assert claim_job_for_fire(jid) is False
assert get_job(jid)["next_run_at"] != before
def test_claim_oneshot_cannot_be_double_claimed(temp_home):
"""A one-shot can't be double-claimed (the fresh claim blocks the retry)."""
from cron.jobs import create_job, claim_job_for_fire
job = create_job(prompt="x", schedule="30m", name="o")
assert claim_job_for_fire(job["id"]) is True
assert claim_job_for_fire(job["id"]) is False
def test_claim_unknown_job_returns_false(temp_home):
from cron.jobs import claim_job_for_fire
assert claim_job_for_fire("nope-does-not-exist") is False
def test_claim_paused_job_returns_false(temp_home):
"""A paused job can't be claimed."""
from cron.jobs import create_job, claim_job_for_fire, pause_job
job = create_job(prompt="x", schedule="every 5m", name="p")
pause_job(job["id"])
assert claim_job_for_fire(job["id"]) is False
def test_stale_claim_is_reclaimable(temp_home, monkeypatch):
"""A claim older than the TTL is overwritten — the fire isn't stuck forever
if the winning machine crashed before mark_job_run cleared the claim."""
from cron.jobs import create_job, claim_job_for_fire
job = create_job(prompt="x", schedule="every 5m", name="s")
jid = job["id"]
assert claim_job_for_fire(jid) is True
# With a 0s TTL, the existing claim is always considered stale.
assert claim_job_for_fire(jid, claim_ttl_seconds=0) is True
def test_mark_job_run_clears_claim(temp_home):
"""After a recurring job completes, its claim is cleared so the next fire
can be claimed again."""
from cron.jobs import create_job, claim_job_for_fire, mark_job_run, get_job
job = create_job(prompt="x", schedule="every 5m", name="c")
jid = job["id"]
assert claim_job_for_fire(jid) is True
assert get_job(jid).get("fire_claim") is not None
mark_job_run(jid, success=True)
assert get_job(jid).get("fire_claim") is None
# …and the re-armed recurring job is claimable again.
assert claim_job_for_fire(jid) is True

View file

@ -0,0 +1,101 @@
"""Tests for on_jobs_changed wiring (Phase 4F.1).
After a store mutation via the consumer surfaces (model tool / CLI / REST), the
active scheduler provider's on_jobs_changed() must be invoked so an external
provider (Chronos) re-provisions/cancels. The built-in's no-op default means
the default path is unchanged.
"""
import pytest
@pytest.fixture
def temp_home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
yield tmp_path
def test_notify_helper_calls_provider_on_jobs_changed(monkeypatch):
"""cron.scheduler._notify_provider_jobs_changed resolves the provider and
calls on_jobs_changed exactly once."""
import cron.scheduler_provider as sp
import cron.scheduler as sched
calls = []
class Spy(sp.CronScheduler):
@property
def name(self):
return "spy"
def start(self, stop_event, **kw):
pass
def on_jobs_changed(self):
calls.append(1)
monkeypatch.setattr(sp, "resolve_cron_scheduler", lambda: Spy())
sched._notify_provider_jobs_changed()
assert calls == [1]
def test_notify_helper_swallows_provider_errors(monkeypatch):
"""A provider that raises in on_jobs_changed must not propagate into the
caller (best-effort notify)."""
import cron.scheduler_provider as sp
import cron.scheduler as sched
class Boom(sp.CronScheduler):
@property
def name(self):
return "boom"
def start(self, stop_event, **kw):
pass
def on_jobs_changed(self):
raise RuntimeError("kaboom")
monkeypatch.setattr(sp, "resolve_cron_scheduler", lambda: Boom())
sched._notify_provider_jobs_changed() # must not raise
def test_builtin_notify_is_harmless(monkeypatch):
"""With the built-in provider (default), notify is a no-op and never
raises."""
import cron.scheduler as sched
# default resolution → built-in; just assert it doesn't blow up.
sched._notify_provider_jobs_changed()
def test_tool_create_notifies_provider(temp_home, monkeypatch):
"""Creating a job via the cronjob tool path invokes on_jobs_changed."""
import cron.scheduler as sched
calls = []
monkeypatch.setattr(sched, "_notify_provider_jobs_changed",
lambda: calls.append("changed"))
from tools.cronjob_tools import cronjob
import json
out = json.loads(cronjob(action="create", prompt="echo hi", schedule="every 5m", name="w"))
assert out["success"] is True
assert calls == ["changed"]
def test_tool_remove_notifies_provider(temp_home, monkeypatch):
"""Removing a job via the tool path invokes on_jobs_changed."""
import json
from tools.cronjob_tools import cronjob
created = json.loads(cronjob(action="create", prompt="x", schedule="every 5m", name="r"))
jid = created["job_id"]
import cron.scheduler as sched
calls = []
monkeypatch.setattr(sched, "_notify_provider_jobs_changed",
lambda: calls.append("changed"))
out = json.loads(cronjob(action="remove", job_id=jid))
assert out["success"] is True
assert calls == ["changed"]

View file

@ -0,0 +1,119 @@
"""Characterization + unit tests for the `run_one_job` shared helper (Phase 4A).
`tick`'s per-job body (`_process_job`) is the execute → save → deliver → mark
sequence that fires ONE due job. Phase 4A extracts it into a module-level
`run_one_job(job, *, adapters=None, loop=None, verbose=False)` so the external
Chronos provider's `fire_due` can reuse the IDENTICAL body — no duplicated
correctness.
The first test characterizes the sequence as driven through `tick()` (proving
the extraction didn't change `tick`'s behavior); the rest unit-test the
extracted helper directly.
"""
import cron.scheduler as s
def _patch_pipeline(monkeypatch, *, success=True, output="out", final="final response",
error=None, silent_marker_in=None):
"""Patch the job pipeline primitives and record the call order."""
calls = []
def fake_run_job(job):
calls.append(("run_job", job["id"]))
fr = final if silent_marker_in is None else silent_marker_in
return (success, output, fr, error)
def fake_save(jid, out):
calls.append(("save", jid))
return f"/tmp/{jid}.txt"
def fake_deliver(job, content, adapters=None, loop=None):
calls.append(("deliver", job["id"]))
return None
def fake_mark(jid, ok, err=None, delivery_error=None):
calls.append(("mark", jid, ok))
monkeypatch.setattr(s, "run_job", fake_run_job)
monkeypatch.setattr(s, "save_job_output", fake_save)
monkeypatch.setattr(s, "_deliver_result", fake_deliver)
monkeypatch.setattr(s, "mark_job_run", fake_mark)
return calls
def test_tick_process_job_sequence(monkeypatch):
"""Characterization: a single due job driven through tick() runs the
sequence run_job save deliver mark, in that order."""
calls = _patch_pipeline(monkeypatch)
monkeypatch.setattr(s, "get_due_jobs", lambda: [{"id": "j1", "name": "t"}])
monkeypatch.setattr(s, "advance_next_run", lambda jid: True)
s.tick(verbose=False, sync=True)
assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"]
assert calls[-1] == ("mark", "j1", True)
def test_run_one_job_success_sequence(monkeypatch):
"""The extracted helper runs the same execute→save→deliver→mark sequence
for a successful job."""
calls = _patch_pipeline(monkeypatch)
ok = s.run_one_job({"id": "j2", "name": "t"})
assert ok is True
assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"]
assert calls[-1] == ("mark", "j2", True)
def test_run_one_job_silent_skips_delivery(monkeypatch):
"""A [SILENT] final response saves output + marks the run but does NOT
deliver."""
calls = _patch_pipeline(monkeypatch, silent_marker_in="[SILENT]")
s.run_one_job({"id": "j3", "name": "t"})
kinds = [c[0] for c in calls]
assert "run_job" in kinds and "save" in kinds and "mark" in kinds
assert "deliver" not in kinds
def test_run_one_job_empty_response_is_soft_failure(monkeypatch):
"""An empty final response marks the run as NOT ok (issue #8585)."""
calls = _patch_pipeline(monkeypatch, final=" ")
s.run_one_job({"id": "j4", "name": "t"})
mark = [c for c in calls if c[0] == "mark"][0]
assert mark == ("mark", "j4", False)
def test_run_one_job_failed_job_delivers_error(monkeypatch):
"""A failed job still delivers (the error notice) and marks not-ok."""
calls = _patch_pipeline(monkeypatch, success=False, final="", error="boom")
s.run_one_job({"id": "j5", "name": "t"})
kinds = [c[0] for c in calls]
assert "deliver" in kinds # failures always deliver
mark = [c for c in calls if c[0] == "mark"][0]
assert mark == ("mark", "j5", False)
def test_run_one_job_exception_marks_failure(monkeypatch):
"""If run_job raises, the helper marks the run failed and returns False
rather than propagating."""
def boom(job):
raise RuntimeError("kaboom")
monkeypatch.setattr(s, "run_job", boom)
marks = []
monkeypatch.setattr(
s, "mark_job_run",
lambda jid, ok, err=None, delivery_error=None: marks.append((jid, ok)),
)
ok = s.run_one_job({"id": "j6", "name": "t"})
assert ok is False
assert marks == [("j6", False)]

View file

@ -0,0 +1,334 @@
"""Characterization tests for the cron trigger before/after the provider refactor.
These lock the CURRENT in-process-ticker contract (Phase 0 of the pluggable
CronScheduler plan, .hermes/plans/cron-scheduler-provider-interface.md). They
must pass unchanged on `main` now, and after every subsequent phase of the
refactor they are the regression harness that proves the built-in firing
behavior is byte-for-byte preserved when the ticker is moved behind the
CronScheduler provider interface.
No production code is exercised beyond the two ticker entry points:
- gateway/run.py::_start_cron_ticker (production gateway ticker)
- hermes_cli/web_server.py::_start_desktop_cron_ticker (desktop fallback)
Both call `cron.scheduler.tick(...)` on a loop and exit when their stop_event
is set. We patch `cron.scheduler.tick` (both tickers import it locally as
`cron_tick`, so the module-attribute patch is observed) and assert the loop
drives it and stops promptly.
"""
import threading
import time
from unittest.mock import patch
def test_ticker_calls_tick_at_least_once_then_stops():
"""The gateway in-process ticker loop calls cron.scheduler.tick repeatedly
and exits promptly once the stop_event is set."""
from gateway.run import _start_cron_ticker
calls = []
stop = threading.Event()
def fake_tick(*args, **kwargs):
calls.append(kwargs)
return 0
with patch("cron.scheduler.tick", side_effect=fake_tick):
# interval=0 keeps the loop tight; stop after a brief beat.
t = threading.Thread(
target=_start_cron_ticker,
args=(stop,),
kwargs={"interval": 0},
daemon=True,
)
t.start()
time.sleep(0.2)
stop.set()
t.join(timeout=5)
assert not t.is_alive(), "ticker did not exit after stop_event was set"
assert len(calls) >= 1, "ticker never called tick()"
# Contract: the ticker invokes tick with sync=False (fire-and-forget from
# the background thread, never the synchronous CLI path).
assert calls[0].get("sync") is False
def test_desktop_ticker_calls_tick_then_stops():
"""The desktop dashboard ticker loop calls cron.scheduler.tick and exits
once the stop_event is set. Desktop has no live adapters, so it ticks with
no adapters/loop."""
from hermes_cli.web_server import _start_desktop_cron_ticker
calls = []
stop = threading.Event()
def fake_tick(*args, **kwargs):
calls.append(kwargs)
return 0
with patch("cron.scheduler.tick", side_effect=fake_tick):
t = threading.Thread(
target=_start_desktop_cron_ticker,
args=(stop,),
kwargs={"interval": 0},
daemon=True,
)
t.start()
time.sleep(0.2)
stop.set()
t.join(timeout=5)
assert not t.is_alive(), "desktop ticker did not exit after stop_event was set"
assert len(calls) >= 1, "desktop ticker never called tick()"
assert calls[0].get("sync") is False
# ── Phase 1: CronScheduler ABC + InProcessCronScheduler ──────────────────────
def test_cronscheduler_is_abstract():
"""name + start are abstract — the bare ABC can't be instantiated."""
import pytest
from cron.scheduler_provider import CronScheduler
with pytest.raises(TypeError):
CronScheduler()
def test_cronscheduler_default_is_available_true():
"""is_available defaults to True (no-network) for a minimal subclass."""
from cron.scheduler_provider import CronScheduler
class Dummy(CronScheduler):
@property
def name(self):
return "dummy"
def start(self, stop_event, **kw):
pass
assert Dummy().is_available() is True
def test_abc_growth_stays_additive():
"""Forward-compat guard: the ABC's REQUIRED surface is exactly name+start.
Any optional hook added later for the external provider
(on_jobs_changed/fire_due/reconcile) must be NON-abstract (carry a default),
so the built-in keeps satisfying the ABC without overriding them. This test
fails loudly if someone makes a future hook abstract (a breaking change that
would force every provider including the built-in to implement it).
"""
from cron.scheduler_provider import CronScheduler
abstract = set(getattr(CronScheduler, "__abstractmethods__", set()))
assert abstract == {"name", "start"}, (
f"CronScheduler abstractmethods changed to {abstract}; growth must be "
"additive (optional methods with defaults), not new abstract methods."
)
def test_inprocess_provider_ticks_and_stops():
"""The built-in provider drives cron.scheduler.tick(sync=False) on a loop
and exits promptly when stop_event is set same contract as the raw
ticker characterized above."""
from cron.scheduler_provider import InProcessCronScheduler
calls = []
stop = threading.Event()
prov = InProcessCronScheduler()
assert prov.name == "builtin"
with patch("cron.scheduler.tick", side_effect=lambda *a, **k: calls.append(k) or 0):
t = threading.Thread(
target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True
)
t.start()
time.sleep(0.2)
stop.set()
t.join(timeout=5)
assert not t.is_alive(), "provider did not exit after stop_event was set"
assert len(calls) >= 1, "provider never called tick()"
assert calls[0].get("sync") is False
def test_inprocess_provider_stop_is_noop():
"""The default stop() hook is a safe no-op (the stop_event is the real
stop signal for the built-in)."""
from cron.scheduler_provider import InProcessCronScheduler
assert InProcessCronScheduler().stop() is None
# ── Phase 2: config key, discovery, resolver ─────────────────────────────────
def test_default_config_cron_provider_is_empty():
"""The new cron.provider key defaults to empty (= built-in)."""
from hermes_cli.config import DEFAULT_CONFIG
assert DEFAULT_CONFIG["cron"]["provider"] == ""
def test_discover_cron_schedulers_returns_list():
"""Discovery returns a list. May be empty — the built-in is core, not
discovered, and no bundled non-default provider ships yet."""
from plugins.cron import discover_cron_schedulers
result = discover_cron_schedulers()
assert isinstance(result, list)
def test_load_unknown_cron_scheduler_returns_none():
from plugins.cron import load_cron_scheduler
assert load_cron_scheduler("does-not-exist-xyz") is None
def test_resolve_defaults_to_builtin(monkeypatch):
"""Empty cron.provider → built-in."""
import hermes_cli.config as cfg
from cron import scheduler_provider as sp
monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": ""}})
prov = sp.resolve_cron_scheduler()
assert prov.name == "builtin"
def test_resolve_no_cron_section_falls_back_to_builtin(monkeypatch):
"""Config with no cron section at all → built-in (cfg_get returns default)."""
import hermes_cli.config as cfg
from cron import scheduler_provider as sp
monkeypatch.setattr(cfg, "load_config", lambda: {})
prov = sp.resolve_cron_scheduler()
assert prov.name == "builtin"
def test_resolve_unknown_provider_falls_back_to_builtin(monkeypatch):
"""A named provider that doesn't exist → built-in (cron never dies)."""
import hermes_cli.config as cfg
from cron import scheduler_provider as sp
monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": "nope-not-real"}})
prov = sp.resolve_cron_scheduler()
assert prov.name == "builtin"
def test_resolve_unavailable_provider_falls_back(monkeypatch):
"""A provider that loads but reports is_available()==False → built-in."""
import hermes_cli.config as cfg
import plugins.cron as pc
from cron import scheduler_provider as sp
from cron.scheduler_provider import CronScheduler
class Unavailable(CronScheduler):
@property
def name(self):
return "unavailable"
def is_available(self):
return False
def start(self, stop_event, **kw):
pass
monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": "unavailable"}})
monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Unavailable())
prov = sp.resolve_cron_scheduler()
assert prov.name == "builtin"
def test_resolve_available_provider_is_used(monkeypatch):
"""A provider that loads and is available is returned (not the fallback)."""
import hermes_cli.config as cfg
import plugins.cron as pc
from cron import scheduler_provider as sp
from cron.scheduler_provider import CronScheduler
class Fake(CronScheduler):
@property
def name(self):
return "fake"
def is_available(self):
return True
def start(self, stop_event, **kw):
pass
monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": "fake"}})
monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Fake())
prov = sp.resolve_cron_scheduler()
assert prov.name == "fake"
# ── Phase 4B: additive hooks (on_jobs_changed / fire_due / reconcile) ────────
def test_hooks_did_not_change_required_surface():
"""The additive hooks must NOT become abstractmethods — the Phase-1 guard
still holds (required surface is exactly name + start)."""
from cron.scheduler_provider import CronScheduler
assert set(CronScheduler.__abstractmethods__) == {"name", "start"}
def test_builtin_inherits_hook_defaults():
"""The built-in inherits no-op defaults for the new hooks (it never needs
to override them)."""
from cron.scheduler_provider import InProcessCronScheduler
p = InProcessCronScheduler()
assert p.on_jobs_changed() is None
assert p.reconcile() is None
# built-in does not override fire_due; it simply isn't called for built-in.
assert hasattr(p, "fire_due")
def test_fire_due_default_claims_then_runs(monkeypatch):
"""The default fire_due claims via the store CAS, fetches the job, and runs
it through the shared run_one_job body."""
import cron.jobs as jobs
import cron.scheduler as sched
from cron.scheduler_provider import InProcessCronScheduler
ran = []
monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False)
monkeypatch.setattr(jobs, "get_job", lambda jid: {"id": jid, "name": "t"})
monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True)
assert InProcessCronScheduler().fire_due("j1") is True
assert ran == ["j1"]
def test_fire_due_lost_claim_does_not_run(monkeypatch):
"""If the CAS claim is lost (another machine/retry won), fire_due returns
False and never runs the job."""
import cron.jobs as jobs
import cron.scheduler as sched
from cron.scheduler_provider import InProcessCronScheduler
ran = []
monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: False, raising=False)
monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True)
assert InProcessCronScheduler().fire_due("j1") is False
assert ran == []
def test_fire_due_missing_job_does_not_run(monkeypatch):
"""If the job vanished between arm and fire (e.g. repeat-N exhausted),
fire_due returns False without running."""
import cron.jobs as jobs
import cron.scheduler as sched
from cron.scheduler_provider import InProcessCronScheduler
ran = []
monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False)
monkeypatch.setattr(jobs, "get_job", lambda jid: None)
monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True)
assert InProcessCronScheduler().fire_due("gone") is False
assert ran == []

View file

@ -0,0 +1,152 @@
"""Tests for the Chronos cron-fire webhook (POST /api/cron/fire) — Phase 4E.2.
The webhook authenticates a NAS-minted JWT via the pluggable fire-verifier
(NOT API_SERVER_KEY), then runs the job via the resolved provider's fire_due in
the background, returning 202. These tests monkeypatch the verifier and
resolve_cron_scheduler the verifier itself is tested with real crypto in
test_chronos_verify.py.
"""
import asyncio
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from gateway.config import PlatformConfig
from gateway.platforms.api_server import APIServerAdapter, cors_middleware
_MOD = "gateway.platforms.api_server"
def _make_adapter() -> APIServerAdapter:
return APIServerAdapter(PlatformConfig(enabled=True, extra={"key": "sk-secret"}))
def _create_app(adapter: APIServerAdapter) -> web.Application:
app = web.Application(middlewares=[cors_middleware])
app["api_server_adapter"] = adapter
app.router.add_post("/api/cron/fire", adapter._handle_cron_fire)
return app
@pytest.fixture
def adapter():
return _make_adapter()
class _SpyProvider:
"""Records fire_due calls; stands in for the resolved provider."""
def __init__(self):
self.fired = []
def fire_due(self, job_id, *, adapters=None, loop=None):
self.fired.append(job_id)
return True
@pytest.mark.asyncio
async def test_valid_token_accepts_and_fires(adapter, monkeypatch):
"""Valid NAS-JWT + {job_id} → 202 and fire_due invoked with that id."""
spy = _SpyProvider()
monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy)
# verifier returns claims (valid token)
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: {"purpose": "cron_fire", "aud": "agent:x"}),
)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post("/api/cron/fire",
headers={"Authorization": "Bearer good"},
json={"job_id": "abc123"})
assert resp.status == 202
data = await resp.json()
assert data["job_id"] == "abc123"
# fire runs in a background thread/task — give it a beat to land.
for _ in range(50):
if spy.fired:
break
await asyncio.sleep(0.01)
assert spy.fired == ["abc123"]
@pytest.mark.asyncio
async def test_invalid_token_401_and_no_fire(adapter, monkeypatch):
"""Bad/forged token → 401, fire_due NOT invoked."""
spy = _SpyProvider()
monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy)
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: None), # verification fails
)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post("/api/cron/fire",
headers={"Authorization": "Bearer forged"},
json={"job_id": "abc123"})
assert resp.status == 401
await asyncio.sleep(0.05)
assert spy.fired == []
@pytest.mark.asyncio
async def test_missing_token_401(adapter, monkeypatch):
"""No Authorization header → verifier gets empty token → 401."""
spy = _SpyProvider()
monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy)
# Real verifier: empty token returns None.
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post("/api/cron/fire", json={"job_id": "abc123"})
assert resp.status == 401
assert spy.fired == []
@pytest.mark.asyncio
async def test_missing_job_id_400(adapter, monkeypatch):
"""Valid token but no job_id → 400, no fire."""
spy = _SpyProvider()
monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy)
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: {"purpose": "cron_fire"}),
)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post("/api/cron/fire",
headers={"Authorization": "Bearer good"},
json={})
assert resp.status == 400
assert spy.fired == []
@pytest.mark.asyncio
async def test_fire_does_not_require_api_server_key(adapter, monkeypatch):
"""The fire endpoint must NOT gate on API_SERVER_KEY — auth is the NAS-JWT.
A request with NO API key header but a valid fire token still succeeds."""
spy = _SpyProvider()
monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy)
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: {"purpose": "cron_fire"}),
)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
# Bearer is the FIRE token, not the API_SERVER_KEY "sk-secret".
resp = await cli.post("/api/cron/fire",
headers={"Authorization": "Bearer nas-jwt"},
json={"job_id": "j9"})
assert resp.status == 202
for _ in range(50):
if spy.fired:
break
await asyncio.sleep(0.01)
assert spy.fired == ["j9"]

View file

@ -0,0 +1,142 @@
"""Tests for the Chronos cron-fire webhook ON THE DASHBOARD APP (web_server).
Regression guard for the relocation bug: the fire webhook MUST live on the
dashboard FastAPI app (`hermes_cli.web_server.app`) the agent's public HTTP
surface on hosted deployments not only on the aiohttp APIServerAdapter (which
hosted agents don't expose). It must:
- be a registered route on the dashboard app,
- be in PUBLIC_API_PATHS so the dashboard cookie gate doesn't 401 it before
the JWT verifier runs,
- reject a bad/missing NAS-JWT with 401 (the JWT is the real gate),
- 400 on missing job_id,
- on a valid token, resolve the job's profile and run fire_due in the
background, returning 202.
"""
import pytest
from starlette.testclient import TestClient
from hermes_cli import web_server
from hermes_cli.dashboard_auth.public_paths import PUBLIC_API_PATHS
def _client(auth_required: bool):
prev_auth = getattr(web_server.app.state, "auth_required", None)
prev_host = getattr(web_server.app.state, "bound_host", None)
web_server.app.state.auth_required = auth_required
web_server.app.state.bound_host = None
client = TestClient(web_server.app)
return client, prev_auth, prev_host
def _restore(prev_auth, prev_host):
if prev_auth is None:
if hasattr(web_server.app.state, "auth_required"):
delattr(web_server.app.state, "auth_required")
else:
web_server.app.state.auth_required = prev_auth
if prev_host is None:
if hasattr(web_server.app.state, "bound_host"):
delattr(web_server.app.state, "bound_host")
else:
web_server.app.state.bound_host = prev_host
def test_route_registered_on_dashboard_app():
"""The fire webhook is served by the dashboard app (the hosted-agent public
surface), not only the aiohttp adapter."""
paths = {r.path for r in web_server.app.routes if hasattr(r, "path")}
assert "/api/cron/fire" in paths
def test_fire_path_is_public():
"""Must bypass the dashboard cookie gate so the NAS bearer-JWT callback
reaches the verifier (the JWT is the real auth)."""
assert "/api/cron/fire" in PUBLIC_API_PATHS
def test_bad_token_401(monkeypatch):
"""Invalid NAS-JWT -> 401, even with the dashboard auth gate ENGAGED
(proves the route is reachable past the cookie gate and the verifier is the
gate). fire_due must NOT run."""
fired = []
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: None), # verification fails
)
monkeypatch.setattr(web_server, "_find_cron_job_profile", lambda jid: "default")
monkeypatch.setattr(web_server, "_fire_cron_job_for_profile",
lambda p, j: fired.append((p, j)))
client, pa, ph = _client(auth_required=True)
try:
resp = client.post("/api/cron/fire",
headers={"Authorization": "Bearer forged"},
json={"job_id": "abc"})
assert resp.status_code == 401
assert fired == []
finally:
_restore(pa, ph)
client.close()
def test_missing_job_id_400(monkeypatch):
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: {"purpose": "cron_fire"}),
)
client, pa, ph = _client(auth_required=False)
try:
resp = client.post("/api/cron/fire",
headers={"Authorization": "Bearer good"},
json={})
assert resp.status_code == 400
finally:
_restore(pa, ph)
client.close()
def test_unknown_job_200_gone(monkeypatch):
"""Valid token but the job isn't found in any profile -> 200 'gone'
(NAS shouldn't retry a fire for a cancelled/completed job)."""
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: {"purpose": "cron_fire"}),
)
monkeypatch.setattr(web_server, "_find_cron_job_profile", lambda jid: None)
client, pa, ph = _client(auth_required=False)
try:
resp = client.post("/api/cron/fire",
headers={"Authorization": "Bearer good"},
json={"job_id": "ghost"})
assert resp.status_code == 200
assert resp.json().get("status") == "gone"
finally:
_restore(pa, ph)
client.close()
def test_valid_token_accepts_and_fires(monkeypatch):
"""Valid token + known job -> 202 and fire_due invoked for the resolved
profile."""
fired = []
monkeypatch.setattr(
"plugins.cron.chronos.verify.get_fire_verifier",
lambda: (lambda **kw: {"purpose": "cron_fire", "aud": "agent:x"}),
)
monkeypatch.setattr(web_server, "_find_cron_job_profile", lambda jid: "default")
monkeypatch.setattr(web_server, "_fire_cron_job_for_profile",
lambda p, j: fired.append((p, j)) or True)
client, pa, ph = _client(auth_required=False)
try:
resp = client.post("/api/cron/fire",
headers={"Authorization": "Bearer good"},
json={"job_id": "j1"})
assert resp.status_code == 202
assert resp.json()["job_id"] == "j1"
finally:
_restore(pa, ph)
client.close()
# background task ran the fire for the resolved profile
assert fired == [("default", "j1")]

View file

@ -0,0 +1,203 @@
"""Unit tests for the Chronos NAS-mediated cron provider (Phase 4D).
All NAS calls are mocked ZERO live network. These prove:
- is_available is config-only (no network), false without config.
- one-shot arming sends the right provision payload (incl. sub-minute fires
the agent owns the time, so there's no 1-minute floor).
- reconcile arms missing, cancels orphaned, skips paused.
- fire_due re-arms the next one-shot after a successful run, and repeat-N
(job gone) stops re-arming.
"""
import pytest
@pytest.fixture
def temp_home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
yield tmp_path
@pytest.fixture
def chronos(monkeypatch):
"""A ChronosCronScheduler with a fake NAS client capturing calls."""
from plugins.cron.chronos import ChronosCronScheduler
class FakeClient:
def __init__(self):
self.provisions = []
self.cancels = []
self._armed = []
def provision(self, *, job_id, fire_at, agent_callback_url, dedup_key):
self.provisions.append({
"job_id": job_id, "fire_at": fire_at,
"agent_callback_url": agent_callback_url, "dedup_key": dedup_key,
})
return {"schedule_id": f"sched-{job_id}"}
def cancel(self, *, job_id):
self.cancels.append(job_id)
return {}
def list_armed(self):
return list(self._armed)
prov = ChronosCronScheduler()
fake = FakeClient()
prov._client = fake
# callback_url is read via _cfg; patch the module helper to avoid config.
monkeypatch.setattr("plugins.cron.chronos._cfg",
lambda *k, default="": "https://agent.example/" if k[-1] == "callback_url" else "https://portal.test")
return prov, fake
# -- is_available -------------------------------------------------------------
def test_is_available_false_without_config(temp_home, monkeypatch):
from plugins.cron.chronos import ChronosCronScheduler
monkeypatch.setattr("plugins.cron.chronos._cfg", lambda *k, default="": "")
assert ChronosCronScheduler().is_available() is False
def test_is_available_true_with_config_and_token(temp_home, monkeypatch):
import plugins.cron.chronos as mod
from plugins.cron.chronos import ChronosCronScheduler
monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x" )
monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state",
lambda pid: {"access_token": "tok"})
assert ChronosCronScheduler().is_available() is True
def test_is_available_makes_no_network(temp_home, monkeypatch):
"""is_available must not construct the NAS client / hit network."""
import plugins.cron.chronos as mod
from plugins.cron.chronos import ChronosCronScheduler
monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x")
monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state",
lambda pid: {"access_token": "tok"})
p = ChronosCronScheduler()
def explode():
raise AssertionError("is_available must not build the NAS client")
monkeypatch.setattr(p, "_get_client", explode)
assert p.is_available() is True # did not call _get_client
# -- arming -------------------------------------------------------------------
def test_arm_one_shot_sends_provision(chronos):
prov, fake = chronos
prov._arm_one_shot({"id": "j1", "next_run_at": "2026-06-18T12:00:00+00:00"})
assert len(fake.provisions) == 1
p = fake.provisions[0]
assert p["job_id"] == "j1"
assert p["fire_at"] == "2026-06-18T12:00:00+00:00"
assert p["dedup_key"] == "j1:2026-06-18T12:00:00+00:00"
assert p["agent_callback_url"] == "https://agent.example/"
def test_arm_one_shot_preserves_sub_minute_fire(chronos):
"""Sub-minute fire times survive — the agent owns the time, so there's no
1-minute scheduler floor."""
prov, fake = chronos
prov._arm_one_shot({"id": "j2", "next_run_at": "2026-06-18T12:00:30+00:00"})
assert fake.provisions[0]["fire_at"] == "2026-06-18T12:00:30+00:00"
def test_arm_one_shot_noop_without_next_run(chronos):
prov, fake = chronos
prov._arm_one_shot({"id": "j3", "next_run_at": None})
assert fake.provisions == []
# -- reconcile ----------------------------------------------------------------
def test_reconcile_arms_all_enabled(temp_home, chronos, monkeypatch):
prov, fake = chronos
jobs = [
{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"},
{"id": "b", "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00", "state": "scheduled"},
]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: next(j for j in jobs if j["id"] == jid))
prov.reconcile()
assert {p["job_id"] for p in fake.provisions} == {"a", "b"}
assert fake.cancels == []
def test_reconcile_cancels_orphan_arms_desired(temp_home, chronos, monkeypatch):
prov, fake = chronos
# NAS already has a stale arm for deleted job "gone".
prov._armed = {"gone": "2026-06-18T11:00:00+00:00"}
jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None))
prov.reconcile()
assert [p["job_id"] for p in fake.provisions] == ["a"]
assert fake.cancels == ["gone"]
def test_reconcile_skips_paused(temp_home, chronos, monkeypatch):
prov, fake = chronos
jobs = [{"id": "p", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "paused"}]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None))
prov.reconcile()
assert fake.provisions == []
def test_reconcile_skips_already_armed_same_time(temp_home, chronos, monkeypatch):
prov, fake = chronos
prov._armed = {"a": "2026-06-18T12:00:00+00:00"}
jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}]
monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: jobs[0])
prov.reconcile()
assert fake.provisions == [] # already armed at the same time → no re-arm
# -- fire_due re-arm ----------------------------------------------------------
def test_fire_due_rearms_next_oneshot(chronos, monkeypatch):
prov, fake = chronos
# super().fire_due runs the job; stub the ABC default to "ran".
monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due",
lambda self, jid, **kw: True)
monkeypatch.setattr("cron.jobs.get_job",
lambda jid: {"id": jid, "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00"})
assert prov.fire_due("j1") is True
assert [p["job_id"] for p in fake.provisions] == ["j1"]
assert fake.provisions[0]["fire_at"] == "2026-06-18T12:05:00+00:00"
def test_fire_due_no_rearm_when_job_gone(chronos, monkeypatch):
"""repeat-N exhausted / one-shot completed → mark_job_run deleted the job →
get_job None no re-arm (the schedule stops cleanly)."""
prov, fake = chronos
monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due",
lambda self, jid, **kw: True)
monkeypatch.setattr("cron.jobs.get_job", lambda jid: None)
assert prov.fire_due("j1") is True
assert fake.provisions == []
def test_fire_due_no_rearm_when_claim_lost(chronos, monkeypatch):
"""If the run didn't happen (claim lost), don't re-arm."""
prov, fake = chronos
monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due",
lambda self, jid, **kw: False)
assert prov.fire_due("j1") is False
assert fake.provisions == []

View file

@ -0,0 +1,182 @@
"""Tests for the Chronos inbound cron-fire JWT verifier (Phase 4E.1).
These exercise REAL RS256 signing/verification (PyJWT[crypto] is a declared
dependency) against an inline PEM public key no mocking of the crypto, since
this is a security boundary. The JWKS-URL path is covered separately by mocking
PyJWKClient's key resolution.
"""
import time
import pytest
@pytest.fixture(scope="module")
def rsa_keys():
"""An RS256 keypair: (private_pem, public_pem)."""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
priv = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
pub = key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
).decode()
return priv, pub
def _mint(priv, claims):
import jwt
return jwt.encode(claims, priv, algorithm="RS256")
AUD = "agent:inst-123"
ISS = "https://portal.nousresearch.com"
def _base_claims(**over):
now = int(time.time())
c = {
"aud": AUD,
"iss": ISS,
"purpose": "cron_fire",
"iat": now,
"nbf": now - 5,
"exp": now + 300,
}
c.update(over)
return c
def test_valid_token_returns_claims(rsa_keys):
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, pub = rsa_keys
token = _mint(priv, _base_claims())
claims = verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=pub, issuer=ISS)
assert claims is not None
assert claims["purpose"] == "cron_fire"
assert claims["aud"] == AUD
def test_wrong_audience_rejected(rsa_keys):
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, pub = rsa_keys
token = _mint(priv, _base_claims(aud="agent:someone-else"))
assert verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=pub, issuer=ISS) is None
def test_missing_purpose_rejected(rsa_keys):
"""A general agent JWT (no purpose=cron_fire) can't fire jobs."""
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, pub = rsa_keys
claims = _base_claims()
del claims["purpose"]
token = _mint(priv, claims)
assert verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=pub, issuer=ISS) is None
def test_wrong_purpose_rejected(rsa_keys):
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, pub = rsa_keys
token = _mint(priv, _base_claims(purpose="inference"))
assert verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=pub, issuer=ISS) is None
def test_expired_token_rejected(rsa_keys):
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, pub = rsa_keys
now = int(time.time())
token = _mint(priv, _base_claims(iat=now - 1000, nbf=now - 1000, exp=now - 600))
assert verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=pub, issuer=ISS) is None
def test_wrong_issuer_rejected(rsa_keys):
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, pub = rsa_keys
token = _mint(priv, _base_claims(iss="https://evil.example"))
assert verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=pub, issuer=ISS) is None
def test_tampered_signature_rejected(rsa_keys):
"""A token signed by a DIFFERENT key must fail signature verification."""
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from plugins.cron.chronos.verify import verify_nas_fire_token
_, pub = rsa_keys
attacker = rsa.generate_private_key(public_exponent=65537, key_size=2048)
attacker_priv = attacker.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
).decode()
token = _mint(attacker_priv, _base_claims())
# Verified against the REAL public key → signature mismatch → None.
assert verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=pub, issuer=ISS) is None
def test_no_key_configured_refuses(rsa_keys):
"""No JWKS/key configured → refuse (never fall back to unsigned decode)."""
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, _ = rsa_keys
token = _mint(priv, _base_claims())
assert verify_nas_fire_token(token=token, expected_audience=AUD,
jwks_or_key=None) is None
def test_empty_token_refused(rsa_keys):
from plugins.cron.chronos.verify import verify_nas_fire_token
_, pub = rsa_keys
assert verify_nas_fire_token(token="", expected_audience=AUD, jwks_or_key=pub) is None
def test_jwks_url_path_resolves_key(rsa_keys, monkeypatch):
"""The JWKS-URL branch resolves the signing key via PyJWKClient."""
from plugins.cron.chronos.verify import verify_nas_fire_token
priv, pub = rsa_keys
token = _mint(priv, _base_claims())
class FakeKey:
key = pub
class FakeJWKClient:
def __init__(self, url):
assert url == "https://portal.nousresearch.com/.well-known/jwks.json"
def get_signing_key_from_jwt(self, tok):
return FakeKey()
monkeypatch.setattr("jwt.PyJWKClient", FakeJWKClient)
claims = verify_nas_fire_token(
token=token, expected_audience=AUD,
jwks_or_key="https://portal.nousresearch.com/.well-known/jwks.json",
issuer=ISS,
)
assert claims is not None and claims["purpose"] == "cron_fire"
def test_get_fire_verifier_returns_nas_verifier():
from plugins.cron.chronos.verify import get_fire_verifier, verify_nas_fire_token
assert get_fire_verifier() is verify_nas_fire_token

View file

@ -33,6 +33,16 @@ from cron.jobs import (
)
def _notify_provider_jobs_changed_safe() -> None:
"""Tell the active cron scheduler provider the job set changed (no-op for
the built-in). Best-effort never lets a provider error break the tool."""
try:
from cron.scheduler import _notify_provider_jobs_changed
_notify_provider_jobs_changed()
except Exception:
pass
# ---------------------------------------------------------------------------
# Cron prompt scanning
# ---------------------------------------------------------------------------
@ -549,6 +559,7 @@ def cronjob(
workdir=_normalize_optional_job_value(workdir),
no_agent=_no_agent,
)
_notify_provider_jobs_changed_safe()
return json.dumps(
{
"success": True,
@ -604,6 +615,7 @@ def cronjob(
removed = remove_job(job_id)
if not removed:
return tool_error(f"Failed to remove job '{job_id}'", success=False)
_notify_provider_jobs_changed_safe()
return json.dumps(
{
"success": True,
@ -619,10 +631,12 @@ def cronjob(
if normalized == "pause":
updated = pause_job(job_id, reason=reason)
_notify_provider_jobs_changed_safe()
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized == "resume":
updated = resume_job(job_id)
_notify_provider_jobs_changed_safe()
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized in {"run", "run_now", "trigger"}:
@ -711,6 +725,7 @@ def cronjob(
if not updates:
return tool_error("No updates provided.", success=False)
updated = update_job(job_id, updates)
_notify_provider_jobs_changed_safe()
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
return tool_error(f"Unknown cron action '{action}'", success=False)

View file

@ -102,10 +102,75 @@ tick()
### Gateway Integration
In gateway mode, the scheduler runs in a dedicated background thread (`_start_cron_ticker` in `gateway/run.py`) that calls `scheduler.tick()` every 60 seconds alongside message handling.
In gateway mode, the cron **trigger** (the part that decides *when* a due job
fires — "Axis B") is selected through a pluggable `CronScheduler` provider. The
gateway calls `resolve_cron_scheduler()` (`cron/scheduler_provider.py`) and runs
the resolved provider's `start()` in a dedicated background thread, alongside a
separate gateway-housekeeping thread.
The active provider is chosen by the `cron.provider` config key:
- **empty (default)** → the built-in `InProcessCronScheduler`, which runs the
historical in-process loop calling `scheduler.tick()` every 60 seconds. This
is byte-identical to the pre-provider behavior.
- **a named provider** (e.g. `chronos`, a managed-cron provider for
scale-to-zero deployments) → discovered from `plugins/cron/<name>/` or
`$HERMES_HOME/plugins/<name>/`.
If a named provider is missing, fails to load, or reports `is_available() ==
False`, the resolver falls back to the built-in with a warning — **cron is
never left without a trigger.** The built-in provider lives in core
(`cron/scheduler_provider.py`), not in `plugins/`, so the fallback can't be
accidentally removed.
What "firing" *means* (job execution + delivery) is unchanged and shared by all
providers — it stays in `scheduler.run_job()` / `scheduler._deliver_result()`.
A provider only controls the trigger, never execution.
In CLI mode, cron jobs only fire when `hermes cron` commands are run or during active CLI sessions.
### Managed cron (Chronos) for scale-to-zero
Hosted gateways can run the **Chronos** provider (`cron.provider: chronos`)
instead of the built-in ticker. Chronos lets an idle gateway **scale to zero**
and still fire cron jobs: rather than a 60-second in-process loop (which would
keep the process awake), it asks Nous infrastructure to arm exactly **one
managed one-shot per job at that job's real next-fire time**. At fire time Nous
calls the gateway back over an authenticated webhook (`POST /api/cron/fire`);
the gateway runs the job through the same `run_one_job` path as the built-in,
then re-arms the next one-shot. Between fires the process can be fully stopped —
it wakes only on a genuine fire, never on a periodic timer.
The flow (the managed scheduler is provided by Nous; the agent holds no
scheduler credentials):
```
create/update a cron job
→ Chronos asks Nous to arm a one-shot at the job's next_run_at
(authenticated with the agent's existing Nous token)
→ at fire time Nous calls the gateway: POST {callback_url}/api/cron/fire
(authenticated with a short-lived, purpose-scoped Nous-minted JWT)
→ the gateway verifies the token, claims the job (store compare-and-set so
multi-replica deployments fire at-most-once), runs it, and re-arms the next
one-shot
```
Config (all non-secret; on hosted agents Nous sets these at provision time):
| key | meaning |
|---|---|
| `cron.provider` | `chronos` to activate (empty = built-in ticker) |
| `cron.chronos.portal_url` | Nous base URL (arming + the fire-token issuer) |
| `cron.chronos.callback_url` | the gateway's own public base URL for inbound fires |
| `cron.chronos.expected_audience` | this agent's fire-token audience |
| `cron.chronos.nas_jwks_url` | key set for verifying the inbound fire token |
If Chronos is misconfigured or the agent isn't logged into Nous,
`resolve_cron_scheduler()` falls back to the built-in ticker (logged warning) —
cron never loses its trigger. Recurring jobs re-arm after each fire; `repeat`-N
jobs stop cleanly when the count is exhausted (no orphaned one-shot). The full
agent↔Nous wire contract lives in `docs/chronos-managed-cron-contract.md`.
### Fresh Session Isolation
Each cron job runs in a completely fresh agent session:

View file

@ -533,6 +533,15 @@ hermes cron <list|create|edit|pause|resume|run|remove|status|tick>
| `status` | Check whether the cron scheduler is running. |
| `tick` | Run due jobs once and exit. |
The cron **trigger** is pluggable via the `cron.provider` config key. Empty
(the default) uses the built-in in-process ticker. Set it to `chronos` (the
NAS-managed provider for scale-to-zero hosted gateways) — configured via the
`cron.chronos.*` keys (`portal_url`, `callback_url`, `expected_audience`,
`nas_jwks_url`) — or name a custom provider under `plugins/cron/<name>/` or
`$HERMES_HOME/plugins/<name>/`. An unknown or unavailable provider falls back to
the built-in, so cron is never left without a trigger. See the
[cron internals](../developer-guide/cron-internals.md#gateway-integration) doc.
## `hermes kanban`
```bash