From 3fc7b624d860aca1004155cbe8a09a083bbef30a Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:46:33 +1000 Subject: [PATCH] feat(cron,gateway): NAS-JWT fire verifier + /api/cron/fire webhook (Chronos) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4E (E.1 + E.2). The inbound side of Chronos: NAS POSTs the agent when a one-shot fires; the agent verifies a NAS-minted JWT and runs the job. E.1 — plugins/cron/chronos/verify.py: - verify_nas_fire_token(token, expected_audience, jwks_or_key, issuer): verifies signature against the NAS JWKS (RS/ES family; symmetric rejected), aud == this agent, exp/nbf, iss, and purpose == "cron_fire" (so a general agent JWT can't be replayed against the fire endpoint). Returns claims or None; never raises. Crypto delegated to PyJWT[crypto] (already a declared dep) — no hand-rolled JWT, no new dependency. No key configured → refuse (never unsigned-decode a security boundary). - get_fire_verifier(): pluggable indirection so the DQ-4 escape hatch (direct per-job cron-key) can swap in with no handler change. E.2 — gateway/platforms/api_server.py: - POST /api/cron/fire (registered only when _CRON_AVAILABLE). Authenticated by the NAS-JWT via get_fire_verifier() — NOT API_SERVER_KEY (NAS holds no API key; this is the only inbound that triggers remote job execution, so it gets its own purpose-scoped check). Verifier args come from cron.chronos.* config. 401 on bad/missing/forged token. 400 on missing job_id. On success: 202 + fire_due runs in the background (so a long agent turn never trips NAS's HTTP timeout); the store CAS claim inside fire_due de-dupes a scheduler retry. Tests: - test_chronos_verify (11): REAL RS256 signing — valid→claims, wrong-aud, missing/wrong purpose, expired, wrong-iss, tampered-signature (attacker key), no-key-refuse, empty-token, JWKS-URL key resolution, get_fire_verifier. - test_cron_fire_webhook (5): valid→202+fire, invalid→401+no-fire, missing token→401, missing job_id→400, and fire path does NOT require API_SERVER_KEY. api_server regression suites (214) green. E.3 (NAS endpoints) is a separate cross-repo PR; the wire contract lands next (docs/chronos-managed-cron-contract.md). --- gateway/platforms/api_server.py | 63 ++++++++ plugins/cron/chronos/verify.py | 103 ++++++++++++++ tests/gateway/test_cron_fire_webhook.py | 152 ++++++++++++++++++++ tests/plugins/test_chronos_verify.py | 182 ++++++++++++++++++++++++ 4 files changed, 500 insertions(+) create mode 100644 plugins/cron/chronos/verify.py create mode 100644 tests/gateway/test_cron_fire_webhook.py create mode 100644 tests/plugins/test_chronos_verify.py diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index da86952a09d..c657f4b4c6d 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -3342,6 +3342,64 @@ class APIServerAdapter(BasePlatformAdapter): except Exception as e: return web.json_response({"error": str(e)}, status=500) + async def _handle_cron_fire(self, request: "web.Request") -> "web.Response": + """POST /api/cron/fire — Chronos managed-cron fire webhook (NAS → agent). + + Authenticated by a NAS-minted JWT (verified via the pluggable + fire-verifier), NOT API_SERVER_KEY — NAS holds no API server key, and + this is the only inbound that can trigger remote job execution, so it + gets its own purpose-scoped token check. + + Returns 202 + runs the job in the background so a long agent turn never + trips NAS's HTTP timeout. The store CAS claim inside fire_due guards + against double-fire on a NAS/scheduler retry. + """ + from hermes_cli.config import cfg_get, load_config + from plugins.cron.chronos.verify import get_fire_verifier + + auth = request.headers.get("Authorization", "") + token = auth[7:].strip() if auth.startswith("Bearer ") else "" + + cfg = load_config() + claims = get_fire_verifier()( + token=token, + expected_audience=cfg_get(cfg, "cron", "chronos", "expected_audience", default=""), + jwks_or_key=cfg_get(cfg, "cron", "chronos", "nas_jwks_url", default="") or None, + issuer=cfg_get(cfg, "cron", "chronos", "portal_url", default="") or None, + ) + if claims is None: + logger.warning( + "cron fire: rejected invalid token: %s", + self._request_audit_log_suffix(request), + ) + return web.json_response({"error": "invalid fire token"}, status=401) + + try: + body = await request.json() + except Exception: + body = {} + job_id = (body or {}).get("job_id") + if not job_id: + return web.json_response({"error": "missing job_id"}, status=400) + + from cron.scheduler_provider import resolve_cron_scheduler + provider = resolve_cron_scheduler() + + loop = asyncio.get_running_loop() + # Fire in the background (202 immediately). fire_due claims via the + # store CAS, so a retry while this is in flight is de-duped. + task = asyncio.create_task( + asyncio.to_thread(provider.fire_due, job_id, adapters=None, loop=loop) + ) + try: + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + except (TypeError, AttributeError): + pass + + return web.json_response({"status": "accepted", "job_id": job_id}, status=202) + + # ------------------------------------------------------------------ # Output extraction helper # ------------------------------------------------------------------ @@ -4196,6 +4254,11 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job) self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job) self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job) + + # Chronos managed-cron fire webhook (NAS → agent). Authenticated by a + # NAS-minted JWT (NOT API_SERVER_KEY), so it has its own auth path. + if _CRON_AVAILABLE: + self._app.router.add_post("/api/cron/fire", self._handle_cron_fire) # Structured event streaming self._app.router.add_post("/v1/runs", self._handle_runs) self._app.router.add_get("/v1/runs/{run_id}", self._handle_get_run) diff --git a/plugins/cron/chronos/verify.py b/plugins/cron/chronos/verify.py new file mode 100644 index 00000000000..99c8db93e4b --- /dev/null +++ b/plugins/cron/chronos/verify.py @@ -0,0 +1,103 @@ +"""Inbound cron-fire token verification for Chronos (Phase 4E.1). + +When NAS relays an external scheduler fire to the agent, it POSTs +``/api/cron/fire`` with a short-lived NAS-minted JWT. This module verifies that +JWT before any job runs — the security boundary for remotely-triggered job +execution. + +We verify a NAS-minted JWT (the trust path the agent already has) rather than +let an external scheduler call the agent directly: the scheduler signs with +NAS's keys, which the agent doesn't (and shouldn't) hold. See the plan's DQ-4. + +The verifier is pluggable (``get_fire_verifier``) so the escape-hatch mode +(direct per-job cron-key) can swap in later with no handler change. + +Crypto is delegated to PyJWT (already a declared dependency) — we do NOT +hand-roll JWT verification. +""" + +from __future__ import annotations + +import logging +from typing import Any, Callable, Dict, Optional + +logger = logging.getLogger("cron.chronos.verify") + +# The purpose claim that scopes a token to the fire endpoint. A general agent +# JWT (without this claim) must NOT be replayable against /api/cron/fire. +_FIRE_PURPOSE = "cron_fire" + + +def verify_nas_fire_token( + *, + token: str, + expected_audience: str, + jwks_or_key: Optional[str] = None, + issuer: Optional[str] = None, + leeway_seconds: int = 30, +) -> Optional[Dict[str, Any]]: + """Verify a NAS-minted cron-fire JWT. Return decoded claims, or None. + + Checks (all must pass): + - signature against the NAS JWKS (``jwks_or_key`` is a JWKS URL) — RS256 + family; symmetric secrets are rejected (NAS signs asymmetrically). + - ``aud`` == ``expected_audience`` (this agent: ``agent:{instance_id}``). + - ``exp`` / ``nbf`` within ``leeway_seconds``. + - ``iss`` == ``issuer`` when an issuer is configured. + - ``purpose`` == ``"cron_fire"`` — so a general agent JWT can't be + replayed against the fire endpoint. + + Returns None (never raises) on any failure, so the handler can answer 401 + without leaking which check failed. + """ + if not token or not expected_audience: + return None + if not jwks_or_key: + # No verification key configured → cannot verify → refuse. We never + # fall back to unsigned decode for a security boundary. + logger.warning("cron fire: no JWKS/key configured; refusing token") + return None + + try: + import jwt + from jwt import PyJWKClient + + # Resolve the signing key from the JWKS endpoint by the token's kid. + signing_key = None + if jwks_or_key.startswith("http://") or jwks_or_key.startswith("https://"): + jwk_client = PyJWKClient(jwks_or_key) + signing_key = jwk_client.get_signing_key_from_jwt(token).key + else: + # A PEM public key passed inline (test / pinned-key deployments). + signing_key = jwks_or_key + + options = {"require": ["exp", "aud"]} + decode_kwargs: Dict[str, Any] = dict( + algorithms=["RS256", "RS384", "RS512", "ES256", "ES384"], + audience=expected_audience, + leeway=leeway_seconds, + options=options, + ) + if issuer: + decode_kwargs["issuer"] = issuer + + claims = jwt.decode(token, signing_key, **decode_kwargs) + except Exception as e: + logger.warning("cron fire: token verification failed: %s", e) + return None + + if claims.get("purpose") != _FIRE_PURPOSE: + logger.warning("cron fire: token missing/!=%s purpose claim", _FIRE_PURPOSE) + return None + + return claims + + +def get_fire_verifier() -> Callable[..., Optional[Dict[str, Any]]]: + """Return the active inbound-fire verifier. + + Default = the NAS-JWT verifier. The DQ-4 escape hatch (direct per-job + cron-key) would return a cron-key verifier here instead, selected by config + — so the webhook handler never changes when the auth mode is swapped. + """ + return verify_nas_fire_token diff --git a/tests/gateway/test_cron_fire_webhook.py b/tests/gateway/test_cron_fire_webhook.py new file mode 100644 index 00000000000..e4aef243526 --- /dev/null +++ b/tests/gateway/test_cron_fire_webhook.py @@ -0,0 +1,152 @@ +"""Tests for the Chronos cron-fire webhook (POST /api/cron/fire) — Phase 4E.2. + +The webhook authenticates a NAS-minted JWT via the pluggable fire-verifier +(NOT API_SERVER_KEY), then runs the job via the resolved provider's fire_due in +the background, returning 202. These tests monkeypatch the verifier and +resolve_cron_scheduler — the verifier itself is tested with real crypto in +test_chronos_verify.py. +""" + +import asyncio + +import pytest +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer + +from gateway.config import PlatformConfig +from gateway.platforms.api_server import APIServerAdapter, cors_middleware + +_MOD = "gateway.platforms.api_server" + + +def _make_adapter() -> APIServerAdapter: + return APIServerAdapter(PlatformConfig(enabled=True, extra={"key": "sk-secret"})) + + +def _create_app(adapter: APIServerAdapter) -> web.Application: + app = web.Application(middlewares=[cors_middleware]) + app["api_server_adapter"] = adapter + app.router.add_post("/api/cron/fire", adapter._handle_cron_fire) + return app + + +@pytest.fixture +def adapter(): + return _make_adapter() + + +class _SpyProvider: + """Records fire_due calls; stands in for the resolved provider.""" + + def __init__(self): + self.fired = [] + + def fire_due(self, job_id, *, adapters=None, loop=None): + self.fired.append(job_id) + return True + + +@pytest.mark.asyncio +async def test_valid_token_accepts_and_fires(adapter, monkeypatch): + """Valid NAS-JWT + {job_id} → 202 and fire_due invoked with that id.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + # verifier returns claims (valid token) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire", "aud": "agent:x"}), + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer good"}, + json={"job_id": "abc123"}) + assert resp.status == 202 + data = await resp.json() + assert data["job_id"] == "abc123" + + # fire runs in a background thread/task — give it a beat to land. + for _ in range(50): + if spy.fired: + break + await asyncio.sleep(0.01) + assert spy.fired == ["abc123"] + + +@pytest.mark.asyncio +async def test_invalid_token_401_and_no_fire(adapter, monkeypatch): + """Bad/forged token → 401, fire_due NOT invoked.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: None), # verification fails + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer forged"}, + json={"job_id": "abc123"}) + assert resp.status == 401 + + await asyncio.sleep(0.05) + assert spy.fired == [] + + +@pytest.mark.asyncio +async def test_missing_token_401(adapter, monkeypatch): + """No Authorization header → verifier gets empty token → 401.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + # Real verifier: empty token returns None. + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", json={"job_id": "abc123"}) + assert resp.status == 401 + assert spy.fired == [] + + +@pytest.mark.asyncio +async def test_missing_job_id_400(adapter, monkeypatch): + """Valid token but no job_id → 400, no fire.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire"}), + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer good"}, + json={}) + assert resp.status == 400 + assert spy.fired == [] + + +@pytest.mark.asyncio +async def test_fire_does_not_require_api_server_key(adapter, monkeypatch): + """The fire endpoint must NOT gate on API_SERVER_KEY — auth is the NAS-JWT. + A request with NO API key header but a valid fire token still succeeds.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire"}), + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + # Bearer is the FIRE token, not the API_SERVER_KEY "sk-secret". + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer nas-jwt"}, + json={"job_id": "j9"}) + assert resp.status == 202 + for _ in range(50): + if spy.fired: + break + await asyncio.sleep(0.01) + assert spy.fired == ["j9"] diff --git a/tests/plugins/test_chronos_verify.py b/tests/plugins/test_chronos_verify.py new file mode 100644 index 00000000000..1d9259f4eee --- /dev/null +++ b/tests/plugins/test_chronos_verify.py @@ -0,0 +1,182 @@ +"""Tests for the Chronos inbound cron-fire JWT verifier (Phase 4E.1). + +These exercise REAL RS256 signing/verification (PyJWT[crypto] is a declared +dependency) against an inline PEM public key — no mocking of the crypto, since +this is a security boundary. The JWKS-URL path is covered separately by mocking +PyJWKClient's key resolution. +""" + +import time + +import pytest + + +@pytest.fixture(scope="module") +def rsa_keys(): + """An RS256 keypair: (private_pem, public_pem).""" + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + priv = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + pub = key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode() + return priv, pub + + +def _mint(priv, claims): + import jwt + return jwt.encode(claims, priv, algorithm="RS256") + + +AUD = "agent:inst-123" +ISS = "https://portal.nousresearch.com" + + +def _base_claims(**over): + now = int(time.time()) + c = { + "aud": AUD, + "iss": ISS, + "purpose": "cron_fire", + "iat": now, + "nbf": now - 5, + "exp": now + 300, + } + c.update(over) + return c + + +def test_valid_token_returns_claims(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims()) + claims = verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) + assert claims is not None + assert claims["purpose"] == "cron_fire" + assert claims["aud"] == AUD + + +def test_wrong_audience_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims(aud="agent:someone-else")) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_missing_purpose_rejected(rsa_keys): + """A general agent JWT (no purpose=cron_fire) can't fire jobs.""" + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + claims = _base_claims() + del claims["purpose"] + token = _mint(priv, claims) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_wrong_purpose_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims(purpose="inference")) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_expired_token_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + now = int(time.time()) + token = _mint(priv, _base_claims(iat=now - 1000, nbf=now - 1000, exp=now - 600)) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_wrong_issuer_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims(iss="https://evil.example")) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_tampered_signature_rejected(rsa_keys): + """A token signed by a DIFFERENT key must fail signature verification.""" + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + from plugins.cron.chronos.verify import verify_nas_fire_token + + _, pub = rsa_keys + attacker = rsa.generate_private_key(public_exponent=65537, key_size=2048) + attacker_priv = attacker.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + token = _mint(attacker_priv, _base_claims()) + # Verified against the REAL public key → signature mismatch → None. + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_no_key_configured_refuses(rsa_keys): + """No JWKS/key configured → refuse (never fall back to unsigned decode).""" + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, _ = rsa_keys + token = _mint(priv, _base_claims()) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=None) is None + + +def test_empty_token_refused(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + _, pub = rsa_keys + assert verify_nas_fire_token(token="", expected_audience=AUD, jwks_or_key=pub) is None + + +def test_jwks_url_path_resolves_key(rsa_keys, monkeypatch): + """The JWKS-URL branch resolves the signing key via PyJWKClient.""" + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims()) + + class FakeKey: + key = pub + + class FakeJWKClient: + def __init__(self, url): + assert url == "https://portal.nousresearch.com/.well-known/jwks.json" + + def get_signing_key_from_jwt(self, tok): + return FakeKey() + + monkeypatch.setattr("jwt.PyJWKClient", FakeJWKClient) + claims = verify_nas_fire_token( + token=token, expected_audience=AUD, + jwks_or_key="https://portal.nousresearch.com/.well-known/jwks.json", + issuer=ISS, + ) + assert claims is not None and claims["purpose"] == "cron_fire" + + +def test_get_fire_verifier_returns_nas_verifier(): + from plugins.cron.chronos.verify import get_fire_verifier, verify_nas_fire_token + + assert get_fire_verifier() is verify_nas_fire_token