hermes-agent/plugins/cron/chronos/_nas_client.py
Ben 4c8bbe6416 feat(cron): Chronos NAS-mediated managed-cron provider (scale-to-zero)
Phase 4D. The first non-default CronScheduler: plugins/cron/chronos/. Inert
unless cron.provider=chronos; resolve_cron_scheduler falls back to the built-in
if unavailable, so cron never loses its trigger.

Files:
- chronos/__init__.py — ChronosCronScheduler + register(ctx).
  * is_available(): config-only, NO network (portal_url + callback_url + a
    stored Nous access token via get_provider_auth_state). Returns False →
    resolver falls back to built-in.
  * start(): reconcile() then RETURN — no blocking loop, no 60s wake (DQ-1:
    this is what makes scale-to-zero real; the machine wakes only on a
    NAS→agent fire).
  * _arm_one_shot(job): POST NAS provision {job_id, fire_at, agent_callback_url,
    dedup_key=job_id:fire_at}. Agent owns the time → sub-minute fires survive
    (no scheduler 1-minute floor).
  * reconcile(): converge NAS arms toward jobs.json — arm missing/changed-time,
    cancel orphaned, skip paused. Cold process rebuilds from jobs.json +
    idempotent dedup_key.
  * on_jobs_changed(): reconcile (re-arm/cancel the affected one-shot).
  * fire_due(): ABC default (CAS claim + run_one_job) THEN re-arm the next
    one-shot. Job gone (one-shot done / repeat-N exhausted) → no re-arm.
- chronos/_nas_client.py — thin HTTP wrapper for provision/cancel/list using
  the agent's existing refresh-aware Nous token (resolve_nous_access_token).
  Names no scheduler vendor; holds no scheduler creds.
- chronos/plugin.yaml — discovery metadata.

INVARIANT: zero "qstash"/"upstash" hits in plugins/cron, gateway, hermes_cli,
website/docs — the external scheduler is a NAS-internal detail, never named
agent-side.

Tests (13, all NAS mocked, zero network): is_available off-without-config +
on-with-config + makes-no-network; arm payload incl. sub-minute + noop without
next_run; reconcile arms-all / cancels-orphan / skips-paused / skips-already-
armed; fire_due re-arms next / no re-arm when job gone / no re-arm when claim
lost.
2026-06-18 14:40:56 +10:00

123 lines
4.7 KiB
Python

"""Thin HTTP client for the agent → NAS ``agent-cron`` endpoints (Chronos).
The Chronos provider speaks ONLY to NAS — it names no scheduler vendor and
holds no scheduler credentials. NAS owns the external scheduler (an internal
implementation detail) and that scheduler's account; the agent just asks NAS to
"arm a one-shot at time T" / "cancel" / "list", authenticated with the agent's
existing Nous Portal access token (the same token it already uses to call the
portal — no new secret).
Wire contract: ``docs/chronos-managed-cron-contract.md``.
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger("cron.chronos")
# Endpoint paths under the portal base URL.
_PROVISION_PATH = "/api/agent-cron/provision"
_CANCEL_PATH = "/api/agent-cron/cancel"
_LIST_PATH = "/api/agent-cron/list"
class NasCronClientError(RuntimeError):
"""Raised when a NAS agent-cron call fails (non-2xx or transport error)."""
class NasCronClient:
"""Minimal client for the agent→NAS provision/cancel/list endpoints.
Uses the agent's refresh-aware Nous access token for auth. No scheduler
vendor, no scheduler creds — NAS hides all of that behind these three calls.
"""
def __init__(self, portal_url: str, *, timeout_seconds: float = 15.0) -> None:
self.portal_url = portal_url.rstrip("/")
self.timeout_seconds = timeout_seconds
# -- auth -------------------------------------------------------------
def _access_token(self) -> str:
"""The agent's existing Nous Portal access token (refresh-aware)."""
from hermes_cli.auth import resolve_nous_access_token
return resolve_nous_access_token()
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self._access_token()}",
"Content-Type": "application/json",
}
# -- HTTP -------------------------------------------------------------
def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]:
import requests # lazy: agent already depends on requests
url = f"{self.portal_url}{path}"
try:
resp = requests.post(
url, json=body, headers=self._headers(), timeout=self.timeout_seconds
)
except Exception as e:
raise NasCronClientError(f"POST {path} failed: {e}") from e
if resp.status_code // 100 != 2:
raise NasCronClientError(
f"POST {path} returned {resp.status_code}: {resp.text[:200]}"
)
try:
return resp.json() if resp.content else {}
except Exception:
return {}
def _get(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]:
import requests
url = f"{self.portal_url}{path}"
try:
resp = requests.get(
url, params=params, headers=self._headers(), timeout=self.timeout_seconds
)
except Exception as e:
raise NasCronClientError(f"GET {path} failed: {e}") from e
if resp.status_code // 100 != 2:
raise NasCronClientError(
f"GET {path} returned {resp.status_code}: {resp.text[:200]}"
)
try:
return resp.json() if resp.content else {}
except Exception:
return {}
# -- endpoints --------------------------------------------------------
def provision(self, *, job_id: str, fire_at: str, agent_callback_url: str,
dedup_key: str) -> Dict[str, Any]:
"""Ask NAS to arm a one-shot for ``job_id`` at ``fire_at`` (ISO 8601).
``dedup_key`` (``{job_id}:{fire_at}``) makes re-arming the same fire
idempotent NAS-side. Returns the NAS response (e.g. ``{schedule_id}``).
"""
return self._post(_PROVISION_PATH, {
"job_id": job_id,
"fire_at": fire_at,
"agent_callback_url": agent_callback_url,
"dedup_key": dedup_key,
})
def cancel(self, *, job_id: str) -> Dict[str, Any]:
"""Ask NAS to cancel any armed one-shot for ``job_id``."""
return self._post(_CANCEL_PATH, {"job_id": job_id})
def list_armed(self) -> List[Dict[str, Any]]:
"""List the one-shots NAS currently has armed for this agent.
Returns a list of ``{job_id, fire_at, schedule_id}``. Best-effort: used
by reconcile to find orphaned arms on a cold process; on error the
caller falls back to idempotent re-arm of all desired jobs.
"""
data = self._get(_LIST_PATH, {})
items = data.get("armed") if isinstance(data, dict) else None
return items if isinstance(items, list) else []