From a657397769ab69b3bc72afca38161e04ee36aff7 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 13:08:21 +1000 Subject: [PATCH 01/12] test(cron): characterize in-process + desktop ticker contract before provider refactor --- tests/cron/test_scheduler_provider.py | 83 +++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 tests/cron/test_scheduler_provider.py diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py new file mode 100644 index 00000000000..1e94347dfa8 --- /dev/null +++ b/tests/cron/test_scheduler_provider.py @@ -0,0 +1,83 @@ +"""Characterization tests for the cron trigger before/after the provider refactor. + +These lock the CURRENT in-process-ticker contract (Phase 0 of the pluggable +CronScheduler plan, .hermes/plans/cron-scheduler-provider-interface.md). They +must pass unchanged on `main` now, and after every subsequent phase of the +refactor — they are the regression harness that proves the built-in firing +behavior is byte-for-byte preserved when the ticker is moved behind the +CronScheduler provider interface. + +No production code is exercised beyond the two ticker entry points: + - gateway/run.py::_start_cron_ticker (production gateway ticker) + - hermes_cli/web_server.py::_start_desktop_cron_ticker (desktop fallback) + +Both call `cron.scheduler.tick(...)` on a loop and exit when their stop_event +is set. We patch `cron.scheduler.tick` (both tickers import it locally as +`cron_tick`, so the module-attribute patch is observed) and assert the loop +drives it and stops promptly. +""" +import threading +import time +from unittest.mock import patch + + +def test_ticker_calls_tick_at_least_once_then_stops(): + """The gateway in-process ticker loop calls cron.scheduler.tick repeatedly + and exits promptly once the stop_event is set.""" + from gateway.run import _start_cron_ticker + + calls = [] + stop = threading.Event() + + def fake_tick(*args, **kwargs): + calls.append(kwargs) + return 0 + + with patch("cron.scheduler.tick", side_effect=fake_tick): + # interval=0 keeps the loop tight; stop after a brief beat. + t = threading.Thread( + target=_start_cron_ticker, + args=(stop,), + kwargs={"interval": 0}, + daemon=True, + ) + t.start() + time.sleep(0.2) + stop.set() + t.join(timeout=5) + + assert not t.is_alive(), "ticker did not exit after stop_event was set" + assert len(calls) >= 1, "ticker never called tick()" + # Contract: the ticker invokes tick with sync=False (fire-and-forget from + # the background thread, never the synchronous CLI path). + assert calls[0].get("sync") is False + + +def test_desktop_ticker_calls_tick_then_stops(): + """The desktop dashboard ticker loop calls cron.scheduler.tick and exits + once the stop_event is set. Desktop has no live adapters, so it ticks with + no adapters/loop.""" + from hermes_cli.web_server import _start_desktop_cron_ticker + + calls = [] + stop = threading.Event() + + def fake_tick(*args, **kwargs): + calls.append(kwargs) + return 0 + + with patch("cron.scheduler.tick", side_effect=fake_tick): + t = threading.Thread( + target=_start_desktop_cron_ticker, + args=(stop,), + kwargs={"interval": 0}, + daemon=True, + ) + t.start() + time.sleep(0.2) + stop.set() + t.join(timeout=5) + + assert not t.is_alive(), "desktop ticker did not exit after stop_event was set" + assert len(calls) >= 1, "desktop ticker never called tick()" + assert calls[0].get("sync") is False From e6ff41ca9516cbca6470a56b1ab98939dbdb935a Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 13:58:43 +1000 Subject: [PATCH 02/12] feat(cron): CronScheduler ABC + InProcessCronScheduler (provider #1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 1 of the pluggable cron-scheduler refactor (Axis B — the trigger). No call-site changes; this phase only makes the abstraction exist + tested in isolation. Task 1.1: cron/scheduler_provider.py — the EXPERIMENTAL CronScheduler ABC. Required surface is name + start; is_available()/stop() carry safe defaults. is_available has a no-network invariant. Docstring marks it experimental until the Chronos provider (Phase 4) validates the shape. Task 1.2: InProcessCronScheduler wraps the historical 60s ticker loop, calling cron.scheduler.tick(sync=False) exactly as the raw ticker does. Uses stop_event.wait(interval) for responsive stop (both raw tickers already do). Tests: ABC-is-abstract, default-is_available, the InProcess loop drives tick and stops, stop() no-op, and test_abc_growth_stays_additive (the forward-compat guard: required abstractmethods must stay exactly {name, start}, so the three Phase-4 hooks land as NON-abstract additions). tick() internals in cron/scheduler.py are byte-unchanged (only new file added). Phase 0 characterization tests still green. Full tests/cron/: 445 passed. --- cron/scheduler_provider.py | 98 +++++++++++++++++++++++++++ tests/cron/test_scheduler_provider.py | 78 +++++++++++++++++++++ 2 files changed, 176 insertions(+) create mode 100644 cron/scheduler_provider.py diff --git a/cron/scheduler_provider.py b/cron/scheduler_provider.py new file mode 100644 index 00000000000..329cf4ae8a6 --- /dev/null +++ b/cron/scheduler_provider.py @@ -0,0 +1,98 @@ +"""CronScheduler provider interface (Axis B — the trigger). + +⚠️ EXPERIMENTAL — this interface is validated by exactly ONE consumer (the +built-in) until an external provider (Chronos, Phase 4) shakes it out. Until +then the module path, method signatures, and start() kwargs MAY change without +a deprecation cycle. Once a second provider validates the shape it becomes +stable. Any growth MUST be additive (new optional method with a default), never +a changed signature on start() or a new abstractmethod. + +A CronScheduler decides *when* a due job fires. It does NOT decide what firing +means: execution + delivery stay in cron.scheduler.run_job / _deliver_result, +shared by all providers. Providers must never reimplement agent construction or +delivery. + +The built-in InProcessCronScheduler runs the historical 60s daemon-thread +ticker. Alternative providers (e.g. Chronos, a NAS-mediated managed-cron +provider for scale-to-zero deployments) live under plugins/cron// and are +selected via the `cron.provider` config key (empty = built-in). +""" +from __future__ import annotations + +import threading +from abc import ABC, abstractmethod +from typing import Any + + +class CronScheduler(ABC): + """Axis-B trigger provider. Decides WHEN a due cron job fires. + + Required surface is intentionally minimal: ``name`` + ``start``. ``stop`` + and ``is_available`` carry safe defaults. The three Phase-4 hooks + (``on_jobs_changed`` / ``fire_due`` / ``reconcile``) are added later as + NON-abstract methods so the built-in keeps satisfying the ABC without + overriding them — see ``test_abc_growth_stays_additive``. + """ + + @property + @abstractmethod + def name(self) -> str: + """Short identifier, e.g. 'builtin', 'chronos'.""" + + def is_available(self) -> bool: + """Whether this provider can run in the current environment. + + MUST NOT make network calls. The built-in is always available; an + external provider checks for configured endpoint/credentials. When a + named provider returns False, the resolver falls back to the built-in. + """ + return True + + @abstractmethod + def start( + self, + stop_event: threading.Event, + *, + adapters: Any = None, + loop: Any = None, + interval: int = 60, + ) -> None: + """Begin firing due jobs. + + For the built-in this BLOCKS in the 60s loop until stop_event is set + (it is run inside a daemon thread by the caller, exactly as today). + An external provider may register a schedule/webhook and return + immediately; in that case it must still honor stop_event for teardown. + """ + + def stop(self) -> None: + """Optional eager teardown hook. Default no-op; setting the stop_event + is the primary stop signal. Override for providers holding external + resources (queue consumers, HTTP servers).""" + return None + + +class InProcessCronScheduler(CronScheduler): + """Default provider: the historical in-process 60s ticker. + + ``start()`` blocks in the tick loop until ``stop_event`` is set, identical + to the pre-refactor ``_start_cron_ticker`` core loop. The caller runs it in + a daemon thread. + """ + + @property + def name(self) -> str: + return "builtin" + + def start(self, stop_event, *, adapters=None, loop=None, interval=60): + import logging + from cron.scheduler import tick as cron_tick + + logger = logging.getLogger("cron.scheduler_provider") + logger.info("In-process cron scheduler started (interval=%ds)", interval) + while not stop_event.is_set(): + try: + cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False) + except Exception as e: + logger.debug("Cron tick error: %s", e) + stop_event.wait(interval) diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 1e94347dfa8..74b3891122c 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -81,3 +81,81 @@ def test_desktop_ticker_calls_tick_then_stops(): assert not t.is_alive(), "desktop ticker did not exit after stop_event was set" assert len(calls) >= 1, "desktop ticker never called tick()" assert calls[0].get("sync") is False + + +# ── Phase 1: CronScheduler ABC + InProcessCronScheduler ────────────────────── + + +def test_cronscheduler_is_abstract(): + """name + start are abstract — the bare ABC can't be instantiated.""" + import pytest + from cron.scheduler_provider import CronScheduler + + with pytest.raises(TypeError): + CronScheduler() + + +def test_cronscheduler_default_is_available_true(): + """is_available defaults to True (no-network) for a minimal subclass.""" + from cron.scheduler_provider import CronScheduler + + class Dummy(CronScheduler): + @property + def name(self): + return "dummy" + + def start(self, stop_event, **kw): + pass + + assert Dummy().is_available() is True + + +def test_abc_growth_stays_additive(): + """Forward-compat guard: the ABC's REQUIRED surface is exactly name+start. + + Any optional hook added later for the external provider + (on_jobs_changed/fire_due/reconcile) must be NON-abstract (carry a default), + so the built-in keeps satisfying the ABC without overriding them. This test + fails loudly if someone makes a future hook abstract (a breaking change that + would force every provider — including the built-in — to implement it). + """ + from cron.scheduler_provider import CronScheduler + + abstract = set(getattr(CronScheduler, "__abstractmethods__", set())) + assert abstract == {"name", "start"}, ( + f"CronScheduler abstractmethods changed to {abstract}; growth must be " + "additive (optional methods with defaults), not new abstract methods." + ) + + +def test_inprocess_provider_ticks_and_stops(): + """The built-in provider drives cron.scheduler.tick(sync=False) on a loop + and exits promptly when stop_event is set — same contract as the raw + ticker characterized above.""" + from cron.scheduler_provider import InProcessCronScheduler + + calls = [] + stop = threading.Event() + prov = InProcessCronScheduler() + assert prov.name == "builtin" + + with patch("cron.scheduler.tick", side_effect=lambda *a, **k: calls.append(k) or 0): + t = threading.Thread( + target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True + ) + t.start() + time.sleep(0.2) + stop.set() + t.join(timeout=5) + + assert not t.is_alive(), "provider did not exit after stop_event was set" + assert len(calls) >= 1, "provider never called tick()" + assert calls[0].get("sync") is False + + +def test_inprocess_provider_stop_is_noop(): + """The default stop() hook is a safe no-op (the stop_event is the real + stop signal for the built-in).""" + from cron.scheduler_provider import InProcessCronScheduler + + assert InProcessCronScheduler().stop() is None From ae8fa11097e181ee61a2f5feba0c77f1d3d1d69d Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:09:36 +1000 Subject: [PATCH 03/12] feat(cron): cron.provider config + plugins/cron discovery + resolver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 2 of the pluggable cron-scheduler refactor. Still no call-site changes; this wires up provider SELECTION with a hard safety net. Task 2.1: cron.provider config key (hermes_cli/config.py), empty = built-in. Additive key — deep-merge picks it up into existing configs with no version bump (verified: load_config() yields the key on a pre-existing config.yaml). Task 2.2: plugins/cron/__init__.py — discovery machinery cloned near-verbatim from plugins/memory/__init__.py, retargeted at CronScheduler / register_cron_scheduler. Bundled (plugins/cron//) + user (/plugins//) dirs, bundled wins collisions. The built-in is NOT discovered here — it's core, so the fallback can't be removed. Task 2.3: resolve_cron_scheduler() in cron/scheduler_provider.py — reads cron.provider and ALWAYS degrades to built-in (missing / unavailable / load error / typo all fall back with a warning). cron can never be left without a trigger. Deviation from plan: the plan's resolver snippet used cfg_get("cron.provider") (dotted-string form). The real cfg_get signature is cfg_get(cfg, *keys, default=) — corrected to cfg_get(load_config(), "cron", "provider", default=""), matching plugins/memory/__init__.py:349. Tests monkeypatch load_config (not cfg_get) so the real traversal runs. Tests: default key empty, discovery returns list, unknown load returns None, and the four resolver paths (empty→builtin, no-section→builtin, unknown→builtin, unavailable→builtin, available→used). Full tests/cron/: 453 passed; config suite green (additive key, no migration break). --- cron/scheduler_provider.py | 40 +++ hermes_cli/config.py | 8 + plugins/cron/__init__.py | 344 ++++++++++++++++++++++++++ tests/cron/test_scheduler_provider.py | 103 ++++++++ 4 files changed, 495 insertions(+) create mode 100644 plugins/cron/__init__.py diff --git a/cron/scheduler_provider.py b/cron/scheduler_provider.py index 329cf4ae8a6..45243e7749c 100644 --- a/cron/scheduler_provider.py +++ b/cron/scheduler_provider.py @@ -72,6 +72,46 @@ class CronScheduler(ABC): return None +def resolve_cron_scheduler() -> "CronScheduler": + """Return the active cron scheduler provider. + + Reads ``cron.provider`` from config. Empty/absent → built-in. A named + provider that is missing, fails to load, or reports ``is_available() == + False`` falls back to the built-in with a warning — cron must never be left + without a trigger. + """ + import logging + + logger = logging.getLogger("cron.scheduler_provider") + + name = "" + try: + from hermes_cli.config import cfg_get, load_config + name = (cfg_get(load_config(), "cron", "provider", default="") or "").strip() + except Exception: + pass + + if not name or name in ("builtin", "in-process", "inprocess"): + return InProcessCronScheduler() + + try: + from plugins.cron import load_cron_scheduler + provider = load_cron_scheduler(name) + if provider is None: + logger.warning("cron.provider '%s' not found; using built-in ticker", name) + return InProcessCronScheduler() + if not provider.is_available(): + logger.warning("cron.provider '%s' not available; using built-in ticker", name) + return InProcessCronScheduler() + logger.info("Using cron scheduler provider: %s", provider.name) + return provider + except Exception as e: + logger.warning( + "Failed to load cron.provider '%s' (%s); using built-in ticker", name, e + ) + return InProcessCronScheduler() + + class InProcessCronScheduler(CronScheduler): """Default provider: the historical in-process 60s ticker. diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 356839f9903..d53393ac432 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2124,6 +2124,14 @@ DEFAULT_CONFIG = { }, "cron": { + # Active cron SCHEDULER provider (Axis B — the trigger that decides + # WHEN a due job fires). Empty string = the built-in in-process 60s + # ticker (default). Name an installed provider (plugins/cron// or + # $HERMES_HOME/plugins//) to relocate the trigger — e.g. "chronos", + # the NAS-mediated managed-cron provider for scale-to-zero deployments. + # An unknown or unavailable provider falls back to the built-in, so cron + # never loses its trigger. + "provider": "", # Wrap delivered cron responses with a header (task name) and footer # ("The agent cannot see this message"). Set to false for clean output. "wrap_response": True, diff --git a/plugins/cron/__init__.py b/plugins/cron/__init__.py new file mode 100644 index 00000000000..fbf1ac2eb08 --- /dev/null +++ b/plugins/cron/__init__.py @@ -0,0 +1,344 @@ +"""Cron scheduler provider plugin discovery. + +Scans two directories for cron scheduler provider plugins: + +1. Bundled providers: ``plugins/cron//`` (shipped with hermes-agent) +2. User-installed providers: ``$HERMES_HOME/plugins//`` + +Each subdirectory must contain ``__init__.py`` with a class implementing the +``CronScheduler`` ABC (``cron/scheduler_provider.py``). On name collisions, +bundled providers take precedence. + +This is a near-verbatim clone of ``plugins/memory/__init__.py`` — the same +discovery/loader machinery, retargeted at ``CronScheduler``. The built-in +``InProcessCronScheduler`` is NOT discovered here: it is core (lives in +``cron/scheduler_provider.py``) so the fallback can never be accidentally +removed. Only NON-default providers (e.g. "chronos") live under this directory. + +Only ONE provider can be active at a time, selected via ``cron.provider`` in +config.yaml (empty = built-in). See ``cron.scheduler_provider.resolve_cron_scheduler``. + +Usage: + from plugins.cron import discover_cron_schedulers, load_cron_scheduler + + available = discover_cron_schedulers() # [(name, desc, available), ...] + provider = load_cron_scheduler("chronos") # CronScheduler instance +""" + +from __future__ import annotations + +import importlib +import importlib.machinery +import importlib.util +import logging +import sys +from pathlib import Path +from typing import List, Optional, Tuple + +logger = logging.getLogger(__name__) + +_CRON_PLUGINS_DIR = Path(__file__).parent + +# Synthetic parent package for user-installed providers, so they don't +# collide with bundled providers in sys.modules. +_USER_NAMESPACE = "_hermes_user_cron" + + +def _register_synthetic_package(name: str, search_locations: List[str]) -> None: + """Register an empty package shell in sys.modules. + + User-installed providers import as ``_hermes_user_cron.``, a dotted + name whose parents exist nowhere on disk. Unless those parents are present + in ``sys.modules``, any relative import inside the plugin + (``from . import config``) fails with + ``ModuleNotFoundError: No module named '_hermes_user_cron'`` — the same + reason the loader already registers ``plugins`` and ``plugins.cron`` for + bundled providers. + """ + if name in sys.modules: + return + spec = importlib.machinery.ModuleSpec(name, None, is_package=True) + spec.submodule_search_locations = search_locations + sys.modules[name] = importlib.util.module_from_spec(spec) + + +# --------------------------------------------------------------------------- +# Directory helpers +# --------------------------------------------------------------------------- + +def _get_user_plugins_dir() -> Optional[Path]: + """Return ``$HERMES_HOME/plugins/`` or None if unavailable.""" + try: + from hermes_constants import get_hermes_home + d = get_hermes_home() / "plugins" + return d if d.is_dir() else None + except Exception: + return None + + +def _is_cron_provider_dir(path: Path) -> bool: + """Heuristic: does *path* look like a cron scheduler provider plugin? + + Checks for ``register_cron_scheduler`` or ``CronScheduler`` in the + ``__init__.py`` source. Cheap text scan — no import needed. + """ + init_file = path / "__init__.py" + if not init_file.exists(): + return False + try: + source = init_file.read_text(errors="replace")[:8192] + return "register_cron_scheduler" in source or "CronScheduler" in source + except Exception: + return False + + +def _iter_provider_dirs() -> List[Tuple[str, Path]]: + """Yield ``(name, path)`` for all discovered provider directories. + + Scans bundled first, then user-installed. Bundled takes precedence on + name collisions (first-seen wins via ``seen`` set). + """ + seen: set = set() + dirs: List[Tuple[str, Path]] = [] + + # 1. Bundled providers (plugins/cron//) + if _CRON_PLUGINS_DIR.is_dir(): + for child in sorted(_CRON_PLUGINS_DIR.iterdir()): + if not child.is_dir() or child.name.startswith(("_", ".")): + continue + if not (child / "__init__.py").exists(): + continue + seen.add(child.name) + dirs.append((child.name, child)) + + # 2. User-installed providers ($HERMES_HOME/plugins//) + user_dir = _get_user_plugins_dir() + if user_dir: + for child in sorted(user_dir.iterdir()): + if not child.is_dir() or child.name.startswith(("_", ".")): + continue + if child.name in seen: + continue # bundled takes precedence + if not _is_cron_provider_dir(child): + continue # skip non-cron plugins + dirs.append((child.name, child)) + + return dirs + + +def find_provider_dir(name: str) -> Optional[Path]: + """Resolve a provider name to its directory. + + Checks bundled first, then user-installed. + """ + # Bundled + bundled = _CRON_PLUGINS_DIR / name + if bundled.is_dir() and (bundled / "__init__.py").exists(): + return bundled + # User-installed + user_dir = _get_user_plugins_dir() + if user_dir: + user = user_dir / name + if user.is_dir() and _is_cron_provider_dir(user): + return user + return None + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def discover_cron_schedulers() -> List[Tuple[str, str, bool]]: + """Scan bundled and user-installed directories for available providers. + + Returns list of (name, description, is_available) tuples. May be empty — + the built-in is core, not discovered here, so a fresh checkout with no + bundled non-default provider returns []. Bundled providers take precedence + on name collisions. + """ + results = [] + + for name, child in _iter_provider_dirs(): + # Read description from plugin.yaml if available + desc = "" + yaml_file = child / "plugin.yaml" + if yaml_file.exists(): + try: + import yaml + with open(yaml_file, encoding="utf-8-sig") as f: + meta = yaml.safe_load(f) or {} + desc = meta.get("description", "") + except Exception: + pass + + # Quick availability check — try loading and calling is_available() + available = True + try: + provider = _load_provider_from_dir(child) + if provider: + available = provider.is_available() + else: + available = False + except Exception: + available = False + + results.append((name, desc, available)) + + return results + + +def load_cron_scheduler(name: str) -> Optional["CronScheduler"]: # noqa: F821 + """Load and return a CronScheduler instance by name. + + Checks both bundled (``plugins/cron//``) and user-installed + (``$HERMES_HOME/plugins//``) directories. Bundled takes precedence + on name collisions. + + Returns None if the provider is not found or fails to load. + """ + provider_dir = find_provider_dir(name) + if not provider_dir: + logger.debug("Cron provider '%s' not found in bundled or user plugins", name) + return None + + try: + provider = _load_provider_from_dir(provider_dir) + if provider: + return provider + logger.warning("Cron provider '%s' loaded but no provider instance found", name) + return None + except Exception as e: + logger.warning("Failed to load cron provider '%s': %s", name, e) + return None + + +def _load_provider_from_dir(provider_dir: Path) -> Optional["CronScheduler"]: # noqa: F821 + """Import a provider module and extract the CronScheduler instance. + + The module must have either: + - A register(ctx) function (plugin-style) — we simulate a ctx + - A top-level class that extends CronScheduler — we instantiate it + """ + name = provider_dir.name + # Use a separate namespace for user-installed plugins so they don't + # collide with bundled providers in sys.modules. + _is_bundled = _CRON_PLUGINS_DIR in provider_dir.parents or provider_dir.parent == _CRON_PLUGINS_DIR + module_name = f"plugins.cron.{name}" if _is_bundled else f"{_USER_NAMESPACE}.{name}" + init_file = provider_dir / "__init__.py" + + if not init_file.exists(): + return None + + # Check if already loaded. A synthetic package shell has no __file__; + # only reuse modules that were actually loaded from disk. + cached = sys.modules.get(module_name) + if cached is not None and getattr(cached, "__file__", None): + mod = cached + else: + # Ensure the parent packages are registered (for relative imports) + for parent in ("plugins", "plugins.cron"): + if parent not in sys.modules: + parent_path = Path(__file__).parent + if parent == "plugins": + parent_path = parent_path.parent + parent_init = parent_path / "__init__.py" + if parent_init.exists(): + spec = importlib.util.spec_from_file_location( + parent, str(parent_init), + submodule_search_locations=[str(parent_path)] + ) + if spec: + parent_mod = importlib.util.module_from_spec(spec) + sys.modules[parent] = parent_mod + try: + spec.loader.exec_module(parent_mod) + except Exception: + pass + + # User-installed plugins need their synthetic parent registered the + # same way, or relative imports inside the plugin cannot resolve. + if not _is_bundled: + _register_synthetic_package(_USER_NAMESPACE, []) + + # Now load the provider module + spec = importlib.util.spec_from_file_location( + module_name, str(init_file), + submodule_search_locations=[str(provider_dir)] + ) + if not spec: + return None + + mod = importlib.util.module_from_spec(spec) + sys.modules[module_name] = mod + + # Register submodules so relative imports work + # e.g., "from ._nas_client import NasCronClient" in the chronos plugin + for sub_file in provider_dir.glob("*.py"): + if sub_file.name == "__init__.py": + continue + sub_name = sub_file.stem + full_sub_name = f"{module_name}.{sub_name}" + if full_sub_name not in sys.modules: + sub_spec = importlib.util.spec_from_file_location( + full_sub_name, str(sub_file) + ) + if sub_spec: + sub_mod = importlib.util.module_from_spec(sub_spec) + sys.modules[full_sub_name] = sub_mod + try: + sub_spec.loader.exec_module(sub_mod) + except Exception as e: + logger.debug("Failed to load submodule %s: %s", full_sub_name, e) + + try: + spec.loader.exec_module(mod) + except Exception as e: + logger.debug("Failed to exec_module %s: %s", module_name, e) + sys.modules.pop(module_name, None) + return None + + # Try register(ctx) pattern first (how our plugins are written) + if hasattr(mod, "register"): + collector = _ProviderCollector() + try: + mod.register(collector) + if collector.provider: + return collector.provider + except Exception as e: + logger.debug("register() failed for %s: %s", name, e) + + # Fallback: find a CronScheduler subclass and instantiate it + from cron.scheduler_provider import CronScheduler + for attr_name in dir(mod): + attr = getattr(mod, attr_name, None) + if (isinstance(attr, type) and issubclass(attr, CronScheduler) + and attr is not CronScheduler): + try: + return attr() + except Exception: + pass + + return None + + +class _ProviderCollector: + """Fake plugin context that captures register_cron_scheduler calls.""" + + def __init__(self): + self.provider = None + + def register_cron_scheduler(self, provider): + self.provider = provider + + # No-op for other registration methods + def register_tool(self, *args, **kwargs): + pass + + def register_hook(self, *args, **kwargs): + pass + + def register_memory_provider(self, *args, **kwargs): + pass + + def register_cli_command(self, *args, **kwargs): + pass diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 74b3891122c..8fdbb305a0f 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -159,3 +159,106 @@ def test_inprocess_provider_stop_is_noop(): from cron.scheduler_provider import InProcessCronScheduler assert InProcessCronScheduler().stop() is None + + +# ── Phase 2: config key, discovery, resolver ───────────────────────────────── + + +def test_default_config_cron_provider_is_empty(): + """The new cron.provider key defaults to empty (= built-in).""" + from hermes_cli.config import DEFAULT_CONFIG + + assert DEFAULT_CONFIG["cron"]["provider"] == "" + + +def test_discover_cron_schedulers_returns_list(): + """Discovery returns a list. May be empty — the built-in is core, not + discovered, and no bundled non-default provider ships yet.""" + from plugins.cron import discover_cron_schedulers + + result = discover_cron_schedulers() + assert isinstance(result, list) + + +def test_load_unknown_cron_scheduler_returns_none(): + from plugins.cron import load_cron_scheduler + + assert load_cron_scheduler("does-not-exist-xyz") is None + + +def test_resolve_defaults_to_builtin(monkeypatch): + """Empty cron.provider → built-in.""" + import hermes_cli.config as cfg + from cron import scheduler_provider as sp + + monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": ""}}) + prov = sp.resolve_cron_scheduler() + assert prov.name == "builtin" + + +def test_resolve_no_cron_section_falls_back_to_builtin(monkeypatch): + """Config with no cron section at all → built-in (cfg_get returns default).""" + import hermes_cli.config as cfg + from cron import scheduler_provider as sp + + monkeypatch.setattr(cfg, "load_config", lambda: {}) + prov = sp.resolve_cron_scheduler() + assert prov.name == "builtin" + + +def test_resolve_unknown_provider_falls_back_to_builtin(monkeypatch): + """A named provider that doesn't exist → built-in (cron never dies).""" + import hermes_cli.config as cfg + from cron import scheduler_provider as sp + + monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": "nope-not-real"}}) + prov = sp.resolve_cron_scheduler() + assert prov.name == "builtin" + + +def test_resolve_unavailable_provider_falls_back(monkeypatch): + """A provider that loads but reports is_available()==False → built-in.""" + import hermes_cli.config as cfg + import plugins.cron as pc + from cron import scheduler_provider as sp + from cron.scheduler_provider import CronScheduler + + class Unavailable(CronScheduler): + @property + def name(self): + return "unavailable" + + def is_available(self): + return False + + def start(self, stop_event, **kw): + pass + + monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": "unavailable"}}) + monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Unavailable()) + prov = sp.resolve_cron_scheduler() + assert prov.name == "builtin" + + +def test_resolve_available_provider_is_used(monkeypatch): + """A provider that loads and is available is returned (not the fallback).""" + import hermes_cli.config as cfg + import plugins.cron as pc + from cron import scheduler_provider as sp + from cron.scheduler_provider import CronScheduler + + class Fake(CronScheduler): + @property + def name(self): + return "fake" + + def is_available(self): + return True + + def start(self, stop_event, **kw): + pass + + monkeypatch.setattr(cfg, "load_config", lambda: {"cron": {"provider": "fake"}}) + monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Fake()) + prov = sp.resolve_cron_scheduler() + assert prov.name == "fake" From abbd8646eb511833500377799f5853d8d4eda5a2 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:14:53 +1000 Subject: [PATCH 04/12] feat(gateway,desktop): start cron via resolved CronScheduler provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 — rebind both ticker call sites to resolve_cron_scheduler(). Default (built-in) path is byte-identical; Phase 0 characterization tests + the full gateway suite (6919) stay green. Task 3.1: split gateway/run.py _start_cron_ticker into: - _start_gateway_housekeeping() — the gateway-only chores (channel-dir refresh, image/doc cache cleanup, paste sweep, curator poll), now on their own loop/thread, independent of which cron provider is active. - _start_cron_ticker() — kept as a DEPRECATED shim that runs only the built-in InProcessCronScheduler().start(), preserving the symbol for hermes_cli/debug.py and the Phase 0 characterization test. Task 3.2: start_gateway() resolves the provider and runs provider.start() in the 'cron-scheduler' thread, plus a second 'gateway-housekeeping' thread; teardown sets the shared cron_stop, calls provider.stop(), joins both. Task 3.3: desktop _start_desktop_cron_ticker() swapped its inline tick loop for resolve_cron_scheduler().start() (no adapters/loop — desktop has none). The provider owns ONLY the cron tick (so an external scale-to-zero provider with no 60s loop fits); gateway housekeeping is decoupled from the cron trigger. Both threads share cron_stop. Verified: full tests/cron/ (453) + full tests/gateway/ (6919) green. Manual gateway smoke (Task 3.4) is operator-run, pending. --- gateway/run.py | 87 +++++++++++++++++++++++++++------------- hermes_cli/web_server.py | 25 +++++------- 2 files changed, 70 insertions(+), 42 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 4b41cfc6aec..2f5900e92f5 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -16454,21 +16454,20 @@ def _run_planned_stop_watcher( stop_event.wait(poll_interval) -def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): - """ - Background thread that ticks the cron scheduler at a regular interval. - - Runs inside the gateway process so cronjobs fire automatically without - needing a separate `hermes cron daemon` or system cron entry. +def _start_gateway_housekeeping(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): + """Background thread for gateway-only periodic chores (NOT cron). - When ``adapters`` and ``loop`` are provided, passes them through to the - cron delivery path so live adapters can be used for E2EE rooms. + Split out of the historical ``_start_cron_ticker`` so the cron *trigger* + can live behind the ``CronScheduler`` provider (built-in or external) while + these gateway-specific chores keep running independently of which provider + fires cron. An external scale-to-zero provider has no 60s loop at all, but + this housekeeping still wants its hourly cadence — so it owns its own loop. - Also refreshes the channel directory every 5 minutes and prunes the - image/audio/document cache + expired ``hermes debug share`` pastes - once per hour. + Refreshes the channel directory every 5 minutes and prunes the + image/audio/document cache + expired ``hermes debug share`` pastes once per + hour, and polls the curator hourly (its inner gate enforces the real + weekly cadence). """ - from cron.scheduler import tick as cron_tick from gateway.platforms.base import cleanup_image_cache, cleanup_document_cache from hermes_cli.debug import _sweep_expired_pastes @@ -16477,14 +16476,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in PASTE_SWEEP_EVERY = 60 # ticks — once per hour CURATOR_EVERY = 60 # ticks — poll hourly (inner gate handles the real cadence) - logger.info("Cron ticker started (interval=%ds)", interval) + logger.info("Gateway housekeeping started (interval=%ds)", interval) tick_count = 0 while not stop_event.is_set(): - try: - cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False) - except Exception as e: - logger.debug("Cron tick error: %s", e) - tick_count += 1 if tick_count % CHANNEL_DIR_EVERY == 0 and adapters: @@ -16492,9 +16486,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in from gateway.channel_directory import build_channel_directory if loop is not None: # build_channel_directory is async (Slack web calls), and - # this ticker runs in a background thread. Schedule onto - # the gateway event loop and wait briefly for completion - # so refresh failures are still logged via the except. + # this runs in a background thread. Schedule onto the + # gateway event loop and wait briefly for completion so + # refresh failures are still logged via the except. fut = safe_schedule_threadsafe( build_channel_directory(adapters), loop, logger=logger, @@ -16530,7 +16524,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in except Exception as e: logger.debug("Paste sweep error: %s", e) - # Curator — piggy-back on the existing cron ticker so long-running + # Curator — piggy-back on the housekeeping loop so long-running # gateways get weekly skill maintenance without needing restarts. # maybe_run_curator() is internally gated by config.interval_hours # (7 days by default), so CURATOR_EVERY is just the poll rate — the @@ -16546,7 +16540,22 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in logger.debug("Curator tick error: %s", e) stop_event.wait(timeout=interval) - logger.info("Cron ticker stopped") + logger.info("Gateway housekeeping stopped") + + +def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): + """DEPRECATED shim — preserved for backward compatibility. + + The cron trigger now lives behind the ``CronScheduler`` provider + (``cron.scheduler_provider``); the gateway resolves a provider and runs its + ``start()`` directly (see ``start_gateway``). This shim runs ONLY the + built-in in-process tick loop, exactly as before, for any external caller + or test that still references this symbol (e.g. hermes_cli/debug.py). It no + longer runs gateway housekeeping — that moved to + ``_start_gateway_housekeeping``. + """ + from cron.scheduler_provider import InProcessCronScheduler + InProcessCronScheduler().start(stop_event, adapters=adapters, loop=loop, interval=interval) async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = False, verbosity: Optional[int] = 0) -> bool: @@ -16942,17 +16951,34 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = logger.error("Gateway exiting cleanly: %s", runner.exit_reason) return True - # Start background cron ticker so scheduled jobs fire automatically. - # Pass the event loop so cron delivery can use live adapters (E2EE support). + # Start the background cron scheduler via the resolved provider so + # scheduled jobs fire automatically. The built-in provider is the + # historical in-process 60s ticker; an external provider (e.g. chronos) + # may arm a schedule and return. Pass the event loop so cron delivery can + # use live adapters (E2EE support). + from cron.scheduler_provider import resolve_cron_scheduler cron_stop = threading.Event() + cron_provider = resolve_cron_scheduler() cron_thread = threading.Thread( - target=_start_cron_ticker, + target=cron_provider.start, args=(cron_stop,), kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()}, daemon=True, - name="cron-ticker", + name="cron-scheduler", ) cron_thread.start() + + # Gateway-only periodic housekeeping (channel dir, cache cleanup, paste + # sweep, curator) — runs independently of which cron provider is active. + # Shares cron_stop as the shutdown signal. + housekeeping_thread = threading.Thread( + target=_start_gateway_housekeeping, + args=(cron_stop,), + kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()}, + daemon=True, + name="gateway-housekeeping", + ) + housekeeping_thread.start() # Wait for shutdown await runner.wait_for_shutdown() @@ -16962,9 +16988,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = logger.error("Gateway exiting with failure: %s", runner.exit_reason) return False - # Stop cron ticker cleanly + # Stop cron scheduler + housekeeping cleanly cron_stop.set() + try: + cron_provider.stop() + except Exception as e: + logger.debug("Cron provider stop() error: %s", e) cron_thread.join(timeout=5) + housekeeping_thread.join(timeout=5) # Stop the planned-stop watcher (daemon=True so this is belt-and-suspenders). _planned_stop_watcher_stop.set() diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 70f39162cf8..768084eba36 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -113,23 +113,20 @@ def _start_desktop_cron_ticker(stop_event: "threading.Event", interval: int = 60 The scheduler tick loop normally lives in ``hermes gateway run`` — but the desktop app spawns a ``hermes dashboard`` backend, not a gateway, so a cron - a user creates in the app would never fire. We run a minimal ticker here - (no live adapters; delivery falls back to the per-platform send path). + a user creates in the app would never fire. We run the resolved cron + scheduler provider here (no live adapters; delivery falls back to the + per-platform send path). - Cross-process safe: ``cron.scheduler.tick`` takes the ``cron/.tick.lock`` - file lock, so this never double-fires alongside a real gateway on the same - HERMES_HOME — whichever process grabs the lock first wins the tick. + Cross-process safe: the built-in provider's ``cron.scheduler.tick`` takes + the ``cron/.tick.lock`` file lock, so this never double-fires alongside a + real gateway on the same HERMES_HOME — whichever process grabs the lock + first wins the tick. """ - from cron.scheduler import tick as cron_tick + from cron.scheduler_provider import resolve_cron_scheduler - _log.info("Desktop cron ticker started (interval=%ds)", interval) - # Tick once up front (catches jobs due at launch), then on the interval. - while not stop_event.is_set(): - try: - cron_tick(verbose=False, sync=False) - except Exception as e: - _log.debug("Desktop cron tick error: %s", e) - stop_event.wait(interval) + provider = resolve_cron_scheduler() + _log.info("Desktop cron scheduler started (provider=%s, interval=%ds)", provider.name, interval) + provider.start(stop_event, interval=interval) @asynccontextmanager From bfb6e0bb33e61cef064ab5b41f91716bc02a474b Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:18:31 +1000 Subject: [PATCH 05/12] docs(cron): document CronScheduler provider + cron.provider key Phase 3.5. cron-internals.md gateway-integration section now describes the pluggable trigger (resolve_cron_scheduler, built-in default, plugins/cron discovery, the never-without-a-trigger fallback, and the trigger-vs-execution split). cli-commands.md notes cron.provider near the hermes cron entry. --- .../docs/developer-guide/cron-internals.md | 25 ++++++++++++++++++- website/docs/reference/cli-commands.md | 7 ++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/website/docs/developer-guide/cron-internals.md b/website/docs/developer-guide/cron-internals.md index bad59645dbc..c895d339b09 100644 --- a/website/docs/developer-guide/cron-internals.md +++ b/website/docs/developer-guide/cron-internals.md @@ -102,7 +102,30 @@ tick() ### Gateway Integration -In gateway mode, the scheduler runs in a dedicated background thread (`_start_cron_ticker` in `gateway/run.py`) that calls `scheduler.tick()` every 60 seconds alongside message handling. +In gateway mode, the cron **trigger** (the part that decides *when* a due job +fires — "Axis B") is selected through a pluggable `CronScheduler` provider. The +gateway calls `resolve_cron_scheduler()` (`cron/scheduler_provider.py`) and runs +the resolved provider's `start()` in a dedicated background thread, alongside a +separate gateway-housekeeping thread. + +The active provider is chosen by the `cron.provider` config key: + +- **empty (default)** → the built-in `InProcessCronScheduler`, which runs the + historical in-process loop calling `scheduler.tick()` every 60 seconds. This + is byte-identical to the pre-provider behavior. +- **a named provider** (e.g. `chronos`, a managed-cron provider for + scale-to-zero deployments) → discovered from `plugins/cron//` or + `$HERMES_HOME/plugins//`. + +If a named provider is missing, fails to load, or reports `is_available() == +False`, the resolver falls back to the built-in with a warning — **cron is +never left without a trigger.** The built-in provider lives in core +(`cron/scheduler_provider.py`), not in `plugins/`, so the fallback can't be +accidentally removed. + +What "firing" *means* (job execution + delivery) is unchanged and shared by all +providers — it stays in `scheduler.run_job()` / `scheduler._deliver_result()`. +A provider only controls the trigger, never execution. In CLI mode, cron jobs only fire when `hermes cron` commands are run or during active CLI sessions. diff --git a/website/docs/reference/cli-commands.md b/website/docs/reference/cli-commands.md index 3071ac0e5fc..f0fe67d4349 100644 --- a/website/docs/reference/cli-commands.md +++ b/website/docs/reference/cli-commands.md @@ -533,6 +533,13 @@ hermes cron | `status` | Check whether the cron scheduler is running. | | `tick` | Run due jobs once and exit. | +The cron **trigger** is pluggable via the `cron.provider` config key. Empty +(the default) uses the built-in in-process ticker. A named provider (e.g. +`chronos`, a managed-cron provider for scale-to-zero deployments) is discovered +from `plugins/cron//` or `$HERMES_HOME/plugins//`; an unknown or +unavailable provider falls back to the built-in, so cron is never left without +a trigger. See the [cron internals](../developer-guide/cron-internals.md#gateway-integration) doc. + ## `hermes kanban` ```bash From 58b19a4f6988f2fda2cddb5c620628afce750a36 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:26:29 +1000 Subject: [PATCH 06/12] refactor(cron): extract run_one_job shared firing helper from tick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4A. Factor tick's per-job closure (_process_job: execute → save → deliver → mark) into a module-level run_one_job(job, *, adapters, loop, verbose) so the external Chronos provider's fire_due (Phase 4D) reuses the IDENTICAL body — no duplicated correctness. tick's _process_job is now a thin wrapper calling run_one_job; the pool/in-flight-guard/contextvars dispatch logic is unchanged. run_one_job fires ONE given job; it does NOT decide due-ness, claim, or compute next_run (tick advances next_run_at under the file lock; an external provider claims via the store CAS in Phase 4C). Pure refactor, no behavior change. TDD: test_run_one_job.py characterizes the sequence through tick() first (test_tick_process_job_sequence, passed pre-extraction), then unit-tests the helper directly: success sequence, [SILENT]→skip delivery, empty-response soft failure (#8585), failed-job-still-delivers, exception→mark-failed. Verified: tests/cron/ 459 passed (was 453 + 6 new); tick behavior unchanged. --- cron/scheduler.py | 105 +++++++++++++++++------------ tests/cron/test_run_one_job.py | 119 +++++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+), 42 deletions(-) create mode 100644 tests/cron/test_run_one_job.py diff --git a/cron/scheduler.py b/cron/scheduler.py index 35906996619..9bab59456ea 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -1967,6 +1967,64 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e) +def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) -> bool: + """Run ONE due job end-to-end: execute → save output → deliver → mark. + + This is the shared firing body extracted from ``tick``'s per-job closure so + that BOTH the built-in ticker and an external provider's ``fire_due`` (e.g. + Chronos) run the identical sequence — no duplicated correctness. + + It does NOT decide whether the job is due, claim it, or compute the next + run — those are the caller's concern (``tick`` advances ``next_run_at`` + under the file lock before dispatch; an external provider claims via the + store CAS). This function only fires the given job once. + + Returns True if the job was processed (even if the job itself failed — + failure is recorded via ``mark_job_run``), False only if processing raised. + """ + try: + success, output, final_response, error = run_job(job) + + output_file = save_job_output(job["id"], output) + if verbose: + logger.info("Output saved to: %s", output_file) + + # Deliver the final response to the origin/target chat. + # If the agent responded with [SILENT], skip delivery (but + # output is already saved above). Failed jobs always deliver. + deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" + # Treat whitespace-only final responses the same as empty + # responses: do not deliver a blank message, and let the + # empty-response guard below mark the run as a soft failure. + should_deliver = bool(deliver_content.strip()) + if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper(): + logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) + should_deliver = False + + delivery_error = None + if should_deliver: + try: + delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) + except Exception as de: + delivery_error = str(de) + logger.error("Delivery failed for job %s: %s", job["id"], de) + + # Treat empty final_response as a soft failure so last_status + # is not "ok" — the agent ran but produced nothing useful. + # (issue #8585) + if success and not final_response.strip(): + success = False + error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" + + mark_job_run(job["id"], success, error, delivery_error=delivery_error) + return True + + except Exception as e: + logger.error("Error processing job %s: %s", job['id'], e) + mark_job_run(job["id"], False, str(e)) + return False + + def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int: """ Check and run all due jobs. @@ -2045,48 +2103,11 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i ) def _process_job(job: dict) -> bool: - """Run one due job end-to-end: execute, save, deliver, mark.""" - try: - success, output, final_response, error = run_job(job) - - output_file = save_job_output(job["id"], output) - if verbose: - logger.info("Output saved to: %s", output_file) - - # Deliver the final response to the origin/target chat. - # If the agent responded with [SILENT], skip delivery (but - # output is already saved above). Failed jobs always deliver. - deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" - # Treat whitespace-only final responses the same as empty - # responses: do not deliver a blank message, and let the - # empty-response guard below mark the run as a soft failure. - should_deliver = bool(deliver_content.strip()) - if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper(): - logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) - should_deliver = False - - delivery_error = None - if should_deliver: - try: - delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) - except Exception as de: - delivery_error = str(de) - logger.error("Delivery failed for job %s: %s", job["id"], de) - - # Treat empty final_response as a soft failure so last_status - # is not "ok" — the agent ran but produced nothing useful. - # (issue #8585) - if success and not final_response.strip(): - success = False - error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" - - mark_job_run(job["id"], success, error, delivery_error=delivery_error) - return True - - except Exception as e: - logger.error("Error processing job %s: %s", job['id'], e) - mark_job_run(job["id"], False, str(e)) - return False + """Run one due job end-to-end. Thin wrapper around the shared + module-level ``run_one_job`` so ``tick`` and external providers + (Chronos ``fire_due``) use the identical execute→save→deliver→mark + body.""" + return run_one_job(job, adapters=adapters, loop=loop, verbose=verbose) # Partition due jobs: those with a per-job workdir mutate # os.environ["TERMINAL_CWD"] inside run_job, which is process-global — diff --git a/tests/cron/test_run_one_job.py b/tests/cron/test_run_one_job.py new file mode 100644 index 00000000000..7da6b1c14f4 --- /dev/null +++ b/tests/cron/test_run_one_job.py @@ -0,0 +1,119 @@ +"""Characterization + unit tests for the `run_one_job` shared helper (Phase 4A). + +`tick`'s per-job body (`_process_job`) is the execute → save → deliver → mark +sequence that fires ONE due job. Phase 4A extracts it into a module-level +`run_one_job(job, *, adapters=None, loop=None, verbose=False)` so the external +Chronos provider's `fire_due` can reuse the IDENTICAL body — no duplicated +correctness. + +The first test characterizes the sequence as driven through `tick()` (proving +the extraction didn't change `tick`'s behavior); the rest unit-test the +extracted helper directly. +""" +import cron.scheduler as s + + +def _patch_pipeline(monkeypatch, *, success=True, output="out", final="final response", + error=None, silent_marker_in=None): + """Patch the job pipeline primitives and record the call order.""" + calls = [] + + def fake_run_job(job): + calls.append(("run_job", job["id"])) + fr = final if silent_marker_in is None else silent_marker_in + return (success, output, fr, error) + + def fake_save(jid, out): + calls.append(("save", jid)) + return f"/tmp/{jid}.txt" + + def fake_deliver(job, content, adapters=None, loop=None): + calls.append(("deliver", job["id"])) + return None + + def fake_mark(jid, ok, err=None, delivery_error=None): + calls.append(("mark", jid, ok)) + + monkeypatch.setattr(s, "run_job", fake_run_job) + monkeypatch.setattr(s, "save_job_output", fake_save) + monkeypatch.setattr(s, "_deliver_result", fake_deliver) + monkeypatch.setattr(s, "mark_job_run", fake_mark) + return calls + + +def test_tick_process_job_sequence(monkeypatch): + """Characterization: a single due job driven through tick() runs the + sequence run_job → save → deliver → mark, in that order.""" + calls = _patch_pipeline(monkeypatch) + monkeypatch.setattr(s, "get_due_jobs", lambda: [{"id": "j1", "name": "t"}]) + monkeypatch.setattr(s, "advance_next_run", lambda jid: True) + + s.tick(verbose=False, sync=True) + + assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"] + assert calls[-1] == ("mark", "j1", True) + + +def test_run_one_job_success_sequence(monkeypatch): + """The extracted helper runs the same execute→save→deliver→mark sequence + for a successful job.""" + calls = _patch_pipeline(monkeypatch) + + ok = s.run_one_job({"id": "j2", "name": "t"}) + + assert ok is True + assert [c[0] for c in calls] == ["run_job", "save", "deliver", "mark"] + assert calls[-1] == ("mark", "j2", True) + + +def test_run_one_job_silent_skips_delivery(monkeypatch): + """A [SILENT] final response saves output + marks the run but does NOT + deliver.""" + calls = _patch_pipeline(monkeypatch, silent_marker_in="[SILENT]") + + s.run_one_job({"id": "j3", "name": "t"}) + + kinds = [c[0] for c in calls] + assert "run_job" in kinds and "save" in kinds and "mark" in kinds + assert "deliver" not in kinds + + +def test_run_one_job_empty_response_is_soft_failure(monkeypatch): + """An empty final response marks the run as NOT ok (issue #8585).""" + calls = _patch_pipeline(monkeypatch, final=" ") + + s.run_one_job({"id": "j4", "name": "t"}) + + mark = [c for c in calls if c[0] == "mark"][0] + assert mark == ("mark", "j4", False) + + +def test_run_one_job_failed_job_delivers_error(monkeypatch): + """A failed job still delivers (the error notice) and marks not-ok.""" + calls = _patch_pipeline(monkeypatch, success=False, final="", error="boom") + + s.run_one_job({"id": "j5", "name": "t"}) + + kinds = [c[0] for c in calls] + assert "deliver" in kinds # failures always deliver + mark = [c for c in calls if c[0] == "mark"][0] + assert mark == ("mark", "j5", False) + + +def test_run_one_job_exception_marks_failure(monkeypatch): + """If run_job raises, the helper marks the run failed and returns False + rather than propagating.""" + def boom(job): + raise RuntimeError("kaboom") + + monkeypatch.setattr(s, "run_job", boom) + marks = [] + monkeypatch.setattr( + s, "mark_job_run", + lambda jid, ok, err=None, delivery_error=None: marks.append((jid, ok)), + ) + + ok = s.run_one_job({"id": "j6", "name": "t"}) + + assert ok is False + assert marks == [("j6", False)] From 6ff5fd373b6695b1ed7b7e0f63fde6a8430d16e6 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:30:31 +1000 Subject: [PATCH 07/12] feat(cron): additive CronScheduler hooks (on_jobs_changed/fire_due/reconcile) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4B. Three NON-abstract hooks on the CronScheduler ABC, all with built-in-safe defaults so the built-in inherits them without overriding and test_abc_growth_stays_additive stays green (required surface still {name, start}): - on_jobs_changed(): post-mutation reconcile hook. Built-in no-op. - fire_due(job_id): claim the job via the store CAS (claim_job_for_fire, Phase 4C) then run it through the shared run_one_job (Phase 4A). Returns False if the claim is lost or the job vanished (repeat-N exhausted between arm and fire). The inbound webhook (Phase 4E) routes here. - reconcile(): converge the external registry toward jobs.json. Built-in no-op. fire_due imports claim_job_for_fire/get_job/run_one_job INSIDE the method, so this commits cleanly before Phase 4C lands claim_job_for_fire (import-time is unaffected; tests monkeypatch it with raising=False). Tests: required-surface-unchanged guard, built-in inherits no-op defaults, and fire_due's three paths (claim+run, lost-claim→no-run, missing-job→no-run). tests/cron/ green (20 in test_scheduler_provider.py). --- cron/scheduler_provider.py | 39 +++++++++++++++ tests/cron/test_scheduler_provider.py | 70 +++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/cron/scheduler_provider.py b/cron/scheduler_provider.py index 45243e7749c..50bca6b892b 100644 --- a/cron/scheduler_provider.py +++ b/cron/scheduler_provider.py @@ -71,6 +71,45 @@ class CronScheduler(ABC): resources (queue consumers, HTTP servers).""" return None + # --- Optional hooks for external providers (added Phase 4). -------------- + # All default-safe so the built-in inherits working behavior without + # overriding. Keep these NON-abstract — see test_abc_growth_stays_additive. + + def on_jobs_changed(self) -> None: + """Called after a successful store mutation (create/update/remove/ + pause/resume). External providers reconcile their registry here (e.g. + Chronos re-provisions/cancels the affected one-shot via NAS). + Built-in: no-op (it re-reads jobs.json on every tick).""" + return None + + def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool: + """Run a single job NOW via the shared orchestrator. Called by the + inbound fire webhook when an external scheduler signals a job is due. + + The default claims the job with a store-level compare-and-set + (multi-machine at-most-once), then runs it via the shared + ``run_one_job`` body. Built-in never calls this (it has its own tick + loop); an external provider routes its inbound fire here. + + Returns True if THIS caller claimed and ran the job, False if the claim + was lost (another machine/retry won it) or the job no longer exists. + """ + from cron.jobs import claim_job_for_fire, get_job + from cron.scheduler import run_one_job + + if not claim_job_for_fire(job_id): + return False # another machine already claimed this fire + job = get_job(job_id) + if job is None: + return False # job removed (e.g. repeat-N exhausted) between arm and fire + return run_one_job(job, adapters=adapters, loop=loop) + + def reconcile(self) -> None: + """Converge the external registry toward jobs.json (the desired state): + arm missing one-shots, cancel orphaned ones, re-arm changed times. + Built-in: no-op.""" + return None + def resolve_cron_scheduler() -> "CronScheduler": """Return the active cron scheduler provider. diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 8fdbb305a0f..2b2e159e2a3 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -262,3 +262,73 @@ def test_resolve_available_provider_is_used(monkeypatch): monkeypatch.setattr(pc, "load_cron_scheduler", lambda n: Fake()) prov = sp.resolve_cron_scheduler() assert prov.name == "fake" + + +# ── Phase 4B: additive hooks (on_jobs_changed / fire_due / reconcile) ──────── + + +def test_hooks_did_not_change_required_surface(): + """The additive hooks must NOT become abstractmethods — the Phase-1 guard + still holds (required surface is exactly name + start).""" + from cron.scheduler_provider import CronScheduler + + assert set(CronScheduler.__abstractmethods__) == {"name", "start"} + + +def test_builtin_inherits_hook_defaults(): + """The built-in inherits no-op defaults for the new hooks (it never needs + to override them).""" + from cron.scheduler_provider import InProcessCronScheduler + + p = InProcessCronScheduler() + assert p.on_jobs_changed() is None + assert p.reconcile() is None + # built-in does not override fire_due; it simply isn't called for built-in. + assert hasattr(p, "fire_due") + + +def test_fire_due_default_claims_then_runs(monkeypatch): + """The default fire_due claims via the store CAS, fetches the job, and runs + it through the shared run_one_job body.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False) + monkeypatch.setattr(jobs, "get_job", lambda jid: {"id": jid, "name": "t"}) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("j1") is True + assert ran == ["j1"] + + +def test_fire_due_lost_claim_does_not_run(monkeypatch): + """If the CAS claim is lost (another machine/retry won), fire_due returns + False and never runs the job.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: False, raising=False) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("j1") is False + assert ran == [] + + +def test_fire_due_missing_job_does_not_run(monkeypatch): + """If the job vanished between arm and fire (e.g. repeat-N exhausted), + fire_due returns False without running.""" + import cron.jobs as jobs + import cron.scheduler as sched + from cron.scheduler_provider import InProcessCronScheduler + + ran = [] + monkeypatch.setattr(jobs, "claim_job_for_fire", lambda jid: True, raising=False) + monkeypatch.setattr(jobs, "get_job", lambda jid: None) + monkeypatch.setattr(sched, "run_one_job", lambda job, **kw: ran.append(job["id"]) or True) + + assert InProcessCronScheduler().fire_due("gone") is False + assert ran == [] From b01eee0c77e182f1c6f9d101c5851fbe4b5efae3 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:34:34 +1000 Subject: [PATCH 08/12] feat(cron): store-level CAS claim for multi-machine at-most-once fire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4C. claim_job_for_fire(job_id, *, claim_ttl_seconds=300) in cron/jobs.py: under the existing _jobs_lock() file lock, claim a job for a single external fire so that across N gateway replicas exactly ONE wins. Single-machine deployments always win (unaffected). Semantics: - missing / disabled / paused job → False. - a fresh fire_claim (younger than claim_ttl_seconds) already present → False (someone else holds it). Stale claim (crashed winner) → overwrite, so a job is never wedged forever. - on win: stamp fire_claim={at, by:_machine_id()}; for recurring (cron/interval) advance next_run_at (mirrors advance_next_run's at-most-once bump so a stale re-delivery can't re-fire); one-shots keep next_run_at but the fresh claim blocks a duplicate retry for the same fire. - mark_job_run now clears fire_claim on completion so a re-armed recurring job is claimable again next fire. _machine_id() (HERMES_MACHINE_ID env, else hostname:pid) is attribution-only; correctness is the file lock + fresh-claim check, not the id. This is consumed by CronScheduler.fire_due (Phase 4B). tick is untouched — it still uses advance_next_run, so the built-in single-machine path is unaffected. Tests (real store, temp HERMES_HOME): claim-once-then-block + next_run advance, one-shot no-double-claim, unknown→False, paused→False, stale-claim reclaimable, mark_job_run clears the claim (recurring re-claimable). tests/cron/ 470 passed. --- cron/jobs.py | 68 ++++++++++++++++++++++ tests/cron/test_claim_job_for_fire.py | 84 +++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 tests/cron/test_claim_job_for_fire.py diff --git a/cron/jobs.py b/cron/jobs.py index 178bd0fad81..2f44608d649 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -976,6 +976,9 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, job["last_error"] = error if not success else None # Track delivery failures separately — cleared on successful delivery job["last_delivery_error"] = delivery_error + # Clear any external-fire claim so a re-armed recurring job can + # be claimed again on its next fire (Phase 4C CAS). + job["fire_claim"] = None # Increment completed count if job.get("repeat"): @@ -1057,6 +1060,71 @@ def advance_next_run(job_id: str) -> bool: return False +def _machine_id() -> str: + """Stable-ish identifier for claim attribution/debugging (NOT correctness). + + Uses ``HERMES_MACHINE_ID`` if set, else hostname + pid. The CAS correctness + comes from the file lock + the fresh-claim check, not from this value. + """ + explicit = os.getenv("HERMES_MACHINE_ID", "").strip() + if explicit: + return explicit + try: + import socket + host = socket.gethostname() + except Exception: + host = "unknown" + return f"{host}:{os.getpid()}" + + +def claim_job_for_fire(job_id: str, *, claim_ttl_seconds: int = 300) -> bool: + """Atomically claim a job for a single external 'fire' (multi-machine + at-most-once). Returns True iff THIS caller won the claim. + + Used by the external-provider fire path (``CronScheduler.fire_due``) when an + external scheduler (Chronos) signals a job is due across N gateway replicas: + exactly one wins. Single-machine deployments always win. + + Under the file lock: reject if the job is missing/disabled/paused. If a + fresh claim (younger than ``claim_ttl_seconds``) already exists, lose. + Otherwise stamp a ``fire_claim`` and, for recurring jobs, advance + ``next_run_at`` (mirrors ``advance_next_run``'s at-most-once bump so a stale + re-delivery for the old time can't re-fire). One-shots keep ``next_run_at`` + but the fresh ``fire_claim`` blocks a duplicate retry for the same fire. + ``mark_job_run`` clears the claim on completion so a re-armed recurring job + is claimable again next fire. + + The stale-claim TTL means a machine that crashed after claiming but before + completing doesn't wedge the job forever — after the TTL another fire can + reclaim it. + """ + with _jobs_lock(): + jobs = load_jobs() + for job in jobs: + if job["id"] != job_id: + continue + if not job.get("enabled", True) or job.get("state") == "paused": + return False + now = _hermes_now() + existing = job.get("fire_claim") + if existing: + try: + claimed_at = _ensure_aware(datetime.fromisoformat(existing["at"])) + if (now - claimed_at).total_seconds() < claim_ttl_seconds: + return False # someone holds a fresh claim + except Exception: + pass # malformed claim → overwrite + job["fire_claim"] = {"at": now.isoformat(), "by": _machine_id()} + kind = job.get("schedule", {}).get("kind") + if kind in {"cron", "interval"}: + nxt = compute_next_run(job["schedule"], now.isoformat()) + if nxt: + job["next_run_at"] = nxt + save_jobs(jobs) + return True + return False + + def get_due_jobs() -> List[Dict[str, Any]]: """Get all jobs that are due to run now. diff --git a/tests/cron/test_claim_job_for_fire.py b/tests/cron/test_claim_job_for_fire.py new file mode 100644 index 00000000000..abbe969eb04 --- /dev/null +++ b/tests/cron/test_claim_job_for_fire.py @@ -0,0 +1,84 @@ +"""Tests for the store-level CAS fire claim (Phase 4C). + +`claim_job_for_fire` gives multi-machine at-most-once semantics when an external +scheduler (Chronos) fires a job: across N gateway replicas, exactly ONE wins the +claim for a given fire. Single-machine deployments always win (unaffected). + +These exercise the real store against a temp HERMES_HOME (no mocks) per the +E2E-over-mocks discipline for file-touching code. +""" +import pytest + + +@pytest.fixture +def temp_home(tmp_path, monkeypatch): + """Isolated HERMES_HOME so jobs.json doesn't touch the real store.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # cron.jobs caches no home at import; get_hermes_home() reads the env live. + yield tmp_path + + +def test_claim_succeeds_once_then_blocks(temp_home): + """First claim for a fire wins; a second claim for the same fire loses, and + next_run_at is advanced (a re-delivery for the old time can't re-fire).""" + from cron.jobs import create_job, claim_job_for_fire, get_job + + job = create_job(prompt="x", schedule="every 5m", name="t") + jid = job["id"] + before = get_job(jid)["next_run_at"] + + assert claim_job_for_fire(jid) is True + assert claim_job_for_fire(jid) is False + assert get_job(jid)["next_run_at"] != before + + +def test_claim_oneshot_cannot_be_double_claimed(temp_home): + """A one-shot can't be double-claimed (the fresh claim blocks the retry).""" + from cron.jobs import create_job, claim_job_for_fire + + job = create_job(prompt="x", schedule="30m", name="o") + assert claim_job_for_fire(job["id"]) is True + assert claim_job_for_fire(job["id"]) is False + + +def test_claim_unknown_job_returns_false(temp_home): + from cron.jobs import claim_job_for_fire + + assert claim_job_for_fire("nope-does-not-exist") is False + + +def test_claim_paused_job_returns_false(temp_home): + """A paused job can't be claimed.""" + from cron.jobs import create_job, claim_job_for_fire, pause_job + + job = create_job(prompt="x", schedule="every 5m", name="p") + pause_job(job["id"]) + assert claim_job_for_fire(job["id"]) is False + + +def test_stale_claim_is_reclaimable(temp_home, monkeypatch): + """A claim older than the TTL is overwritten — the fire isn't stuck forever + if the winning machine crashed before mark_job_run cleared the claim.""" + from cron.jobs import create_job, claim_job_for_fire + + job = create_job(prompt="x", schedule="every 5m", name="s") + jid = job["id"] + assert claim_job_for_fire(jid) is True + # With a 0s TTL, the existing claim is always considered stale. + assert claim_job_for_fire(jid, claim_ttl_seconds=0) is True + + +def test_mark_job_run_clears_claim(temp_home): + """After a recurring job completes, its claim is cleared so the next fire + can be claimed again.""" + from cron.jobs import create_job, claim_job_for_fire, mark_job_run, get_job + + job = create_job(prompt="x", schedule="every 5m", name="c") + jid = job["id"] + assert claim_job_for_fire(jid) is True + assert get_job(jid).get("fire_claim") is not None + + mark_job_run(jid, success=True) + assert get_job(jid).get("fire_claim") is None + # …and the re-armed recurring job is claimable again. + assert claim_job_for_fire(jid) is True From 4c8bbe6416966fccc8663be0c4049121d2af5f07 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:40:56 +1000 Subject: [PATCH 09/12] feat(cron): Chronos NAS-mediated managed-cron provider (scale-to-zero) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4D. The first non-default CronScheduler: plugins/cron/chronos/. Inert unless cron.provider=chronos; resolve_cron_scheduler falls back to the built-in if unavailable, so cron never loses its trigger. Files: - chronos/__init__.py — ChronosCronScheduler + register(ctx). * is_available(): config-only, NO network (portal_url + callback_url + a stored Nous access token via get_provider_auth_state). Returns False → resolver falls back to built-in. * start(): reconcile() then RETURN — no blocking loop, no 60s wake (DQ-1: this is what makes scale-to-zero real; the machine wakes only on a NAS→agent fire). * _arm_one_shot(job): POST NAS provision {job_id, fire_at, agent_callback_url, dedup_key=job_id:fire_at}. Agent owns the time → sub-minute fires survive (no scheduler 1-minute floor). * reconcile(): converge NAS arms toward jobs.json — arm missing/changed-time, cancel orphaned, skip paused. Cold process rebuilds from jobs.json + idempotent dedup_key. * on_jobs_changed(): reconcile (re-arm/cancel the affected one-shot). * fire_due(): ABC default (CAS claim + run_one_job) THEN re-arm the next one-shot. Job gone (one-shot done / repeat-N exhausted) → no re-arm. - chronos/_nas_client.py — thin HTTP wrapper for provision/cancel/list using the agent's existing refresh-aware Nous token (resolve_nous_access_token). Names no scheduler vendor; holds no scheduler creds. - chronos/plugin.yaml — discovery metadata. INVARIANT: zero "qstash"/"upstash" hits in plugins/cron, gateway, hermes_cli, website/docs — the external scheduler is a NAS-internal detail, never named agent-side. Tests (13, all NAS mocked, zero network): is_available off-without-config + on-with-config + makes-no-network; arm payload incl. sub-minute + noop without next_run; reconcile arms-all / cancels-orphan / skips-paused / skips-already- armed; fire_due re-arms next / no re-arm when job gone / no re-arm when claim lost. --- plugins/cron/chronos/__init__.py | 241 ++++++++++++++++++++++++++++ plugins/cron/chronos/_nas_client.py | 123 ++++++++++++++ plugins/cron/chronos/plugin.yaml | 9 ++ tests/plugins/test_chronos_cron.py | 203 +++++++++++++++++++++++ 4 files changed, 576 insertions(+) create mode 100644 plugins/cron/chronos/__init__.py create mode 100644 plugins/cron/chronos/_nas_client.py create mode 100644 plugins/cron/chronos/plugin.yaml create mode 100644 tests/plugins/test_chronos_cron.py diff --git a/plugins/cron/chronos/__init__.py b/plugins/cron/chronos/__init__.py new file mode 100644 index 00000000000..1ec5a457763 --- /dev/null +++ b/plugins/cron/chronos/__init__.py @@ -0,0 +1,241 @@ +"""Chronos — NAS-mediated managed cron provider (scale-to-zero). + +Chronos (the Greek god of time, alongside Hermes) is the first non-default +``CronScheduler``. It lets a hosted gateway scale to zero while idle and still +fire cron jobs: instead of a 60s in-process ticker, it asks NAS to arm exactly +one external one-shot per job at that job's real next-fire time. NAS calls the +agent back at fire time over an authenticated webhook (``/api/cron/fire``); the +agent runs the job via the shared ``run_one_job`` body and re-arms the next +one-shot. + +The external scheduler NAS uses is an internal NAS implementation detail — +Chronos names no vendor, holds no scheduler credentials, and speaks only to +NAS's ``agent-cron`` endpoints with the agent's existing Nous token. + +Design constraints (see the plan's DQ-1): + - start() arms all enabled jobs and RETURNS; it never blocks and never spawns + a periodic wake. Between fires the machine is truly at zero. + - reconcile runs only on a warm process (start / on_jobs_changed / piggybacked + on a fire), never as a periodic wake of a sleeping machine. + +Inert unless ``cron.provider: chronos``. ``resolve_cron_scheduler`` falls back +to the built-in if Chronos is unavailable, so cron never loses its trigger. + +Wire contract: ``docs/chronos-managed-cron-contract.md``. +""" + +from __future__ import annotations + +import logging +import threading +from typing import Any, Dict, Optional + +from cron.scheduler_provider import CronScheduler + +logger = logging.getLogger("cron.chronos") + + +def _cfg(*keys: str, default: Any = "") -> Any: + """Read a cron.chronos.* config value (no network).""" + try: + from hermes_cli.config import cfg_get, load_config + return cfg_get(load_config(), *keys, default=default) + except Exception: + return default + + +class ChronosCronScheduler(CronScheduler): + """NAS-mediated external cron provider.""" + + def __init__(self) -> None: + # In-memory map of job_id → fire_at we've asked NAS to arm. Best-effort + # cache; reconcile rebuilds desired state from jobs.json, so a cold + # process simply re-arms (idempotent via dedup_key). + self._armed: Dict[str, str] = {} + self._lock = threading.Lock() + self._client = None # lazily constructed (no network in is_available) + + # -- identity / availability ----------------------------------------- + + @property + def name(self) -> str: + return "chronos" + + def is_available(self) -> bool: + """Config presence only — NO network. + + Chronos needs a portal base URL, the agent's own publicly-reachable + callback URL (for NAS→agent fires), and a usable Nous token (the agent + is logged into the portal). If any is missing, resolve_cron_scheduler + falls back to the built-in ticker. + """ + if not (_cfg("cron", "chronos", "portal_url") and _cfg("cron", "chronos", "callback_url")): + return False + return self._have_nous_token() + + def _have_nous_token(self) -> bool: + """True if the agent has a Nous Portal login (no network call). + + Checks the stored auth state for a Nous access token — does NOT refresh + or hit the network (is_available must stay offline). The actual + refresh-aware token is resolved lazily at provision time. + """ + try: + from hermes_cli.auth import get_provider_auth_state + state = get_provider_auth_state("nous") or {} + return bool(state.get("access_token")) + except Exception: + return False + + # -- client ----------------------------------------------------------- + + def _get_client(self): + if self._client is None: + from ._nas_client import NasCronClient + self._client = NasCronClient(_cfg("cron", "chronos", "portal_url")) + return self._client + + def _callback_url(self) -> str: + return str(_cfg("cron", "chronos", "callback_url") or "") + + # -- lifecycle -------------------------------------------------------- + + def start(self, stop_event, *, adapters=None, loop=None, interval=60): + """Arm all enabled jobs via NAS, then RETURN immediately. + + Does NOT block and does NOT spawn a 60s wake (DQ-1) — that is the whole + point of scale-to-zero. The machine wakes only on a NAS→agent fire. + """ + try: + self.reconcile() + except Exception as e: + logger.warning("Chronos start() reconcile failed: %s", e) + # Intentionally return — no loop, no periodic wake. + + def stop(self) -> None: + return None + + def on_jobs_changed(self) -> None: + """A job was created/updated/removed/paused/resumed — reconcile the NAS + registry so the affected one-shot is (re-)armed or cancelled.""" + try: + self.reconcile() + except Exception as e: + logger.debug("Chronos on_jobs_changed reconcile failed: %s", e) + + # -- arming ----------------------------------------------------------- + + def _arm_one_shot(self, job: Dict[str, Any]) -> None: + """Ask NAS to arm exactly one one-shot at the job's next_run_at. + + The agent computes the time; NAS+its scheduler are the dumb executor. + Idempotent per (job_id, fire_at) via dedup_key, so re-arming the same + fire is a no-op NAS-side. + """ + job_id = job["id"] + fire_at = job.get("next_run_at") + if not fire_at: + return + dedup_key = f"{job_id}:{fire_at}" + self._get_client().provision( + job_id=job_id, + fire_at=fire_at, + agent_callback_url=self._callback_url(), + dedup_key=dedup_key, + ) + with self._lock: + self._armed[job_id] = fire_at + + def _cancel(self, job_id: str) -> None: + try: + self._get_client().cancel(job_id=job_id) + finally: + with self._lock: + self._armed.pop(job_id, None) + + def _list_armed(self) -> Dict[str, str]: + """Observed armed one-shots: job_id → fire_at. + + Prefer the in-memory map (warm process); on a cold/empty map, ask NAS + (best-effort). If NAS list fails, return what we have — reconcile then + re-arms desired jobs idempotently. + """ + with self._lock: + if self._armed: + return dict(self._armed) + try: + observed = { + item["job_id"]: item.get("fire_at", "") + for item in self._get_client().list_armed() + if item.get("job_id") + } + with self._lock: + self._armed.update(observed) + return observed + except Exception as e: + logger.debug("Chronos _list_armed failed (will re-arm idempotently): %s", e) + return {} + + # -- reconcile -------------------------------------------------------- + + def reconcile(self) -> None: + """Converge the NAS-armed one-shots toward jobs.json (desired state): + arm missing / re-arm changed-time, cancel orphaned.""" + from cron.jobs import load_jobs + + desired: Dict[str, str] = { + j["id"]: j["next_run_at"] + for j in load_jobs() + if j.get("enabled") and j.get("next_run_at") and j.get("state") != "paused" + } + observed = self._list_armed() + + # Arm missing or changed-time. + for job_id, fire_at in desired.items(): + if observed.get(job_id) != fire_at: + # Re-fetch the full job dict to arm (need the whole record). + from cron.jobs import get_job + job = get_job(job_id) + if job: + try: + self._arm_one_shot(job) + except Exception as e: + logger.warning("Chronos failed to arm job %s: %s", job_id, e) + + # Cancel orphans (armed but no longer desired). + for job_id in list(observed.keys()): + if job_id not in desired: + try: + self._cancel(job_id) + except Exception as e: + logger.warning("Chronos failed to cancel orphan %s: %s", job_id, e) + + # -- fire ------------------------------------------------------------- + + def fire_due(self, job_id: str, *, adapters: Any = None, loop: Any = None) -> bool: + """Run the due job (claim + run_one_job via the ABC default), then + re-arm the NEXT one-shot through NAS. + + Re-arm happens AFTER the run so next_run_at reflects the completed fire. + If the job is gone (one-shot completed / repeat-N exhausted), get_job + returns None → nothing to re-arm (the schedule naturally stops). + """ + ran = super().fire_due(job_id, adapters=adapters, loop=loop) + if ran: + from cron.jobs import get_job + job = get_job(job_id) + if job and job.get("enabled") and job.get("next_run_at"): + try: + self._arm_one_shot(job) + except Exception as e: + logger.warning("Chronos failed to re-arm job %s after fire: %s", job_id, e) + return ran + + +def register(ctx) -> None: + """Plugin entrypoint — register the Chronos provider with the loader. + + Mirrors the memory-plugin shape; plugins/cron discovery calls this and + collects the provider via register_cron_scheduler. + """ + ctx.register_cron_scheduler(ChronosCronScheduler()) diff --git a/plugins/cron/chronos/_nas_client.py b/plugins/cron/chronos/_nas_client.py new file mode 100644 index 00000000000..04382adc8ea --- /dev/null +++ b/plugins/cron/chronos/_nas_client.py @@ -0,0 +1,123 @@ +"""Thin HTTP client for the agent → NAS ``agent-cron`` endpoints (Chronos). + +The Chronos provider speaks ONLY to NAS — it names no scheduler vendor and +holds no scheduler credentials. NAS owns the external scheduler (an internal +implementation detail) and that scheduler's account; the agent just asks NAS to +"arm a one-shot at time T" / "cancel" / "list", authenticated with the agent's +existing Nous Portal access token (the same token it already uses to call the +portal — no new secret). + +Wire contract: ``docs/chronos-managed-cron-contract.md``. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("cron.chronos") + +# Endpoint paths under the portal base URL. +_PROVISION_PATH = "/api/agent-cron/provision" +_CANCEL_PATH = "/api/agent-cron/cancel" +_LIST_PATH = "/api/agent-cron/list" + + +class NasCronClientError(RuntimeError): + """Raised when a NAS agent-cron call fails (non-2xx or transport error).""" + + +class NasCronClient: + """Minimal client for the agent→NAS provision/cancel/list endpoints. + + Uses the agent's refresh-aware Nous access token for auth. No scheduler + vendor, no scheduler creds — NAS hides all of that behind these three calls. + """ + + def __init__(self, portal_url: str, *, timeout_seconds: float = 15.0) -> None: + self.portal_url = portal_url.rstrip("/") + self.timeout_seconds = timeout_seconds + + # -- auth ------------------------------------------------------------- + + def _access_token(self) -> str: + """The agent's existing Nous Portal access token (refresh-aware).""" + from hermes_cli.auth import resolve_nous_access_token + return resolve_nous_access_token() + + def _headers(self) -> Dict[str, str]: + return { + "Authorization": f"Bearer {self._access_token()}", + "Content-Type": "application/json", + } + + # -- HTTP ------------------------------------------------------------- + + def _post(self, path: str, body: Dict[str, Any]) -> Dict[str, Any]: + import requests # lazy: agent already depends on requests + + url = f"{self.portal_url}{path}" + try: + resp = requests.post( + url, json=body, headers=self._headers(), timeout=self.timeout_seconds + ) + except Exception as e: + raise NasCronClientError(f"POST {path} failed: {e}") from e + if resp.status_code // 100 != 2: + raise NasCronClientError( + f"POST {path} returned {resp.status_code}: {resp.text[:200]}" + ) + try: + return resp.json() if resp.content else {} + except Exception: + return {} + + def _get(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]: + import requests + + url = f"{self.portal_url}{path}" + try: + resp = requests.get( + url, params=params, headers=self._headers(), timeout=self.timeout_seconds + ) + except Exception as e: + raise NasCronClientError(f"GET {path} failed: {e}") from e + if resp.status_code // 100 != 2: + raise NasCronClientError( + f"GET {path} returned {resp.status_code}: {resp.text[:200]}" + ) + try: + return resp.json() if resp.content else {} + except Exception: + return {} + + # -- endpoints -------------------------------------------------------- + + def provision(self, *, job_id: str, fire_at: str, agent_callback_url: str, + dedup_key: str) -> Dict[str, Any]: + """Ask NAS to arm a one-shot for ``job_id`` at ``fire_at`` (ISO 8601). + + ``dedup_key`` (``{job_id}:{fire_at}``) makes re-arming the same fire + idempotent NAS-side. Returns the NAS response (e.g. ``{schedule_id}``). + """ + return self._post(_PROVISION_PATH, { + "job_id": job_id, + "fire_at": fire_at, + "agent_callback_url": agent_callback_url, + "dedup_key": dedup_key, + }) + + def cancel(self, *, job_id: str) -> Dict[str, Any]: + """Ask NAS to cancel any armed one-shot for ``job_id``.""" + return self._post(_CANCEL_PATH, {"job_id": job_id}) + + def list_armed(self) -> List[Dict[str, Any]]: + """List the one-shots NAS currently has armed for this agent. + + Returns a list of ``{job_id, fire_at, schedule_id}``. Best-effort: used + by reconcile to find orphaned arms on a cold process; on error the + caller falls back to idempotent re-arm of all desired jobs. + """ + data = self._get(_LIST_PATH, {}) + items = data.get("armed") if isinstance(data, dict) else None + return items if isinstance(items, list) else [] diff --git a/plugins/cron/chronos/plugin.yaml b/plugins/cron/chronos/plugin.yaml new file mode 100644 index 00000000000..aad48b35655 --- /dev/null +++ b/plugins/cron/chronos/plugin.yaml @@ -0,0 +1,9 @@ +name: chronos +description: >- + Chronos — NAS-mediated managed cron provider for scale-to-zero hosted agents. + Delegates the "wake me at time T" trigger to Nous infrastructure so an idle + gateway can scale to zero and still fire cron jobs. The agent computes each + job's next-fire time and asks NAS to arm a one-shot; NAS calls the agent back + at fire time over an authenticated webhook. Inert unless cron.provider=chronos. +version: 1.0.0 +author: Nous Research diff --git a/tests/plugins/test_chronos_cron.py b/tests/plugins/test_chronos_cron.py new file mode 100644 index 00000000000..36b32f7a501 --- /dev/null +++ b/tests/plugins/test_chronos_cron.py @@ -0,0 +1,203 @@ +"""Unit tests for the Chronos NAS-mediated cron provider (Phase 4D). + +All NAS calls are mocked — ZERO live network. These prove: + - is_available is config-only (no network), false without config. + - one-shot arming sends the right provision payload (incl. sub-minute fires — + the agent owns the time, so there's no 1-minute floor). + - reconcile arms missing, cancels orphaned, skips paused. + - fire_due re-arms the next one-shot after a successful run, and repeat-N + (job gone) stops re-arming. +""" + +import pytest + + +@pytest.fixture +def temp_home(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + yield tmp_path + + +@pytest.fixture +def chronos(monkeypatch): + """A ChronosCronScheduler with a fake NAS client capturing calls.""" + from plugins.cron.chronos import ChronosCronScheduler + + class FakeClient: + def __init__(self): + self.provisions = [] + self.cancels = [] + self._armed = [] + + def provision(self, *, job_id, fire_at, agent_callback_url, dedup_key): + self.provisions.append({ + "job_id": job_id, "fire_at": fire_at, + "agent_callback_url": agent_callback_url, "dedup_key": dedup_key, + }) + return {"schedule_id": f"sched-{job_id}"} + + def cancel(self, *, job_id): + self.cancels.append(job_id) + return {} + + def list_armed(self): + return list(self._armed) + + prov = ChronosCronScheduler() + fake = FakeClient() + prov._client = fake + # callback_url is read via _cfg; patch the module helper to avoid config. + monkeypatch.setattr("plugins.cron.chronos._cfg", + lambda *k, default="": "https://agent.example/" if k[-1] == "callback_url" else "https://portal.test") + return prov, fake + + +# -- is_available ------------------------------------------------------------- + +def test_is_available_false_without_config(temp_home, monkeypatch): + from plugins.cron.chronos import ChronosCronScheduler + + monkeypatch.setattr("plugins.cron.chronos._cfg", lambda *k, default="": "") + assert ChronosCronScheduler().is_available() is False + + +def test_is_available_true_with_config_and_token(temp_home, monkeypatch): + import plugins.cron.chronos as mod + from plugins.cron.chronos import ChronosCronScheduler + + monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x" ) + monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state", + lambda pid: {"access_token": "tok"}) + assert ChronosCronScheduler().is_available() is True + + +def test_is_available_makes_no_network(temp_home, monkeypatch): + """is_available must not construct the NAS client / hit network.""" + import plugins.cron.chronos as mod + from plugins.cron.chronos import ChronosCronScheduler + + monkeypatch.setattr(mod, "_cfg", lambda *k, default="": "https://x") + monkeypatch.setattr("hermes_cli.auth.get_provider_auth_state", + lambda pid: {"access_token": "tok"}) + p = ChronosCronScheduler() + + def explode(): + raise AssertionError("is_available must not build the NAS client") + + monkeypatch.setattr(p, "_get_client", explode) + assert p.is_available() is True # did not call _get_client + + +# -- arming ------------------------------------------------------------------- + +def test_arm_one_shot_sends_provision(chronos): + prov, fake = chronos + prov._arm_one_shot({"id": "j1", "next_run_at": "2026-06-18T12:00:00+00:00"}) + + assert len(fake.provisions) == 1 + p = fake.provisions[0] + assert p["job_id"] == "j1" + assert p["fire_at"] == "2026-06-18T12:00:00+00:00" + assert p["dedup_key"] == "j1:2026-06-18T12:00:00+00:00" + assert p["agent_callback_url"] == "https://agent.example/" + + +def test_arm_one_shot_preserves_sub_minute_fire(chronos): + """Sub-minute fire times survive — the agent owns the time, so there's no + 1-minute scheduler floor.""" + prov, fake = chronos + prov._arm_one_shot({"id": "j2", "next_run_at": "2026-06-18T12:00:30+00:00"}) + assert fake.provisions[0]["fire_at"] == "2026-06-18T12:00:30+00:00" + + +def test_arm_one_shot_noop_without_next_run(chronos): + prov, fake = chronos + prov._arm_one_shot({"id": "j3", "next_run_at": None}) + assert fake.provisions == [] + + +# -- reconcile ---------------------------------------------------------------- + +def test_reconcile_arms_all_enabled(temp_home, chronos, monkeypatch): + prov, fake = chronos + jobs = [ + {"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}, + {"id": "b", "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00", "state": "scheduled"}, + ] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: next(j for j in jobs if j["id"] == jid)) + + prov.reconcile() + assert {p["job_id"] for p in fake.provisions} == {"a", "b"} + assert fake.cancels == [] + + +def test_reconcile_cancels_orphan_arms_desired(temp_home, chronos, monkeypatch): + prov, fake = chronos + # NAS already has a stale arm for deleted job "gone". + prov._armed = {"gone": "2026-06-18T11:00:00+00:00"} + jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None)) + + prov.reconcile() + assert [p["job_id"] for p in fake.provisions] == ["a"] + assert fake.cancels == ["gone"] + + +def test_reconcile_skips_paused(temp_home, chronos, monkeypatch): + prov, fake = chronos + jobs = [{"id": "p", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "paused"}] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: next((j for j in jobs if j["id"] == jid), None)) + + prov.reconcile() + assert fake.provisions == [] + + +def test_reconcile_skips_already_armed_same_time(temp_home, chronos, monkeypatch): + prov, fake = chronos + prov._armed = {"a": "2026-06-18T12:00:00+00:00"} + jobs = [{"id": "a", "enabled": True, "next_run_at": "2026-06-18T12:00:00+00:00", "state": "scheduled"}] + monkeypatch.setattr("cron.jobs.load_jobs", lambda: jobs) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: jobs[0]) + + prov.reconcile() + assert fake.provisions == [] # already armed at the same time → no re-arm + + +# -- fire_due re-arm ---------------------------------------------------------- + +def test_fire_due_rearms_next_oneshot(chronos, monkeypatch): + prov, fake = chronos + # super().fire_due runs the job; stub the ABC default to "ran". + monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due", + lambda self, jid, **kw: True) + monkeypatch.setattr("cron.jobs.get_job", + lambda jid: {"id": jid, "enabled": True, "next_run_at": "2026-06-18T12:05:00+00:00"}) + + assert prov.fire_due("j1") is True + assert [p["job_id"] for p in fake.provisions] == ["j1"] + assert fake.provisions[0]["fire_at"] == "2026-06-18T12:05:00+00:00" + + +def test_fire_due_no_rearm_when_job_gone(chronos, monkeypatch): + """repeat-N exhausted / one-shot completed → mark_job_run deleted the job → + get_job None → no re-arm (the schedule stops cleanly).""" + prov, fake = chronos + monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due", + lambda self, jid, **kw: True) + monkeypatch.setattr("cron.jobs.get_job", lambda jid: None) + + assert prov.fire_due("j1") is True + assert fake.provisions == [] + + +def test_fire_due_no_rearm_when_claim_lost(chronos, monkeypatch): + """If the run didn't happen (claim lost), don't re-arm.""" + prov, fake = chronos + monkeypatch.setattr("cron.scheduler_provider.CronScheduler.fire_due", + lambda self, jid, **kw: False) + + assert prov.fire_due("j1") is False + assert fake.provisions == [] From 3fc7b624d860aca1004155cbe8a09a083bbef30a Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:46:33 +1000 Subject: [PATCH 10/12] feat(cron,gateway): NAS-JWT fire verifier + /api/cron/fire webhook (Chronos) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4E (E.1 + E.2). The inbound side of Chronos: NAS POSTs the agent when a one-shot fires; the agent verifies a NAS-minted JWT and runs the job. E.1 — plugins/cron/chronos/verify.py: - verify_nas_fire_token(token, expected_audience, jwks_or_key, issuer): verifies signature against the NAS JWKS (RS/ES family; symmetric rejected), aud == this agent, exp/nbf, iss, and purpose == "cron_fire" (so a general agent JWT can't be replayed against the fire endpoint). Returns claims or None; never raises. Crypto delegated to PyJWT[crypto] (already a declared dep) — no hand-rolled JWT, no new dependency. No key configured → refuse (never unsigned-decode a security boundary). - get_fire_verifier(): pluggable indirection so the DQ-4 escape hatch (direct per-job cron-key) can swap in with no handler change. E.2 — gateway/platforms/api_server.py: - POST /api/cron/fire (registered only when _CRON_AVAILABLE). Authenticated by the NAS-JWT via get_fire_verifier() — NOT API_SERVER_KEY (NAS holds no API key; this is the only inbound that triggers remote job execution, so it gets its own purpose-scoped check). Verifier args come from cron.chronos.* config. 401 on bad/missing/forged token. 400 on missing job_id. On success: 202 + fire_due runs in the background (so a long agent turn never trips NAS's HTTP timeout); the store CAS claim inside fire_due de-dupes a scheduler retry. Tests: - test_chronos_verify (11): REAL RS256 signing — valid→claims, wrong-aud, missing/wrong purpose, expired, wrong-iss, tampered-signature (attacker key), no-key-refuse, empty-token, JWKS-URL key resolution, get_fire_verifier. - test_cron_fire_webhook (5): valid→202+fire, invalid→401+no-fire, missing token→401, missing job_id→400, and fire path does NOT require API_SERVER_KEY. api_server regression suites (214) green. E.3 (NAS endpoints) is a separate cross-repo PR; the wire contract lands next (docs/chronos-managed-cron-contract.md). --- gateway/platforms/api_server.py | 63 ++++++++ plugins/cron/chronos/verify.py | 103 ++++++++++++++ tests/gateway/test_cron_fire_webhook.py | 152 ++++++++++++++++++++ tests/plugins/test_chronos_verify.py | 182 ++++++++++++++++++++++++ 4 files changed, 500 insertions(+) create mode 100644 plugins/cron/chronos/verify.py create mode 100644 tests/gateway/test_cron_fire_webhook.py create mode 100644 tests/plugins/test_chronos_verify.py diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index da86952a09d..c657f4b4c6d 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -3342,6 +3342,64 @@ class APIServerAdapter(BasePlatformAdapter): except Exception as e: return web.json_response({"error": str(e)}, status=500) + async def _handle_cron_fire(self, request: "web.Request") -> "web.Response": + """POST /api/cron/fire — Chronos managed-cron fire webhook (NAS → agent). + + Authenticated by a NAS-minted JWT (verified via the pluggable + fire-verifier), NOT API_SERVER_KEY — NAS holds no API server key, and + this is the only inbound that can trigger remote job execution, so it + gets its own purpose-scoped token check. + + Returns 202 + runs the job in the background so a long agent turn never + trips NAS's HTTP timeout. The store CAS claim inside fire_due guards + against double-fire on a NAS/scheduler retry. + """ + from hermes_cli.config import cfg_get, load_config + from plugins.cron.chronos.verify import get_fire_verifier + + auth = request.headers.get("Authorization", "") + token = auth[7:].strip() if auth.startswith("Bearer ") else "" + + cfg = load_config() + claims = get_fire_verifier()( + token=token, + expected_audience=cfg_get(cfg, "cron", "chronos", "expected_audience", default=""), + jwks_or_key=cfg_get(cfg, "cron", "chronos", "nas_jwks_url", default="") or None, + issuer=cfg_get(cfg, "cron", "chronos", "portal_url", default="") or None, + ) + if claims is None: + logger.warning( + "cron fire: rejected invalid token: %s", + self._request_audit_log_suffix(request), + ) + return web.json_response({"error": "invalid fire token"}, status=401) + + try: + body = await request.json() + except Exception: + body = {} + job_id = (body or {}).get("job_id") + if not job_id: + return web.json_response({"error": "missing job_id"}, status=400) + + from cron.scheduler_provider import resolve_cron_scheduler + provider = resolve_cron_scheduler() + + loop = asyncio.get_running_loop() + # Fire in the background (202 immediately). fire_due claims via the + # store CAS, so a retry while this is in flight is de-duped. + task = asyncio.create_task( + asyncio.to_thread(provider.fire_due, job_id, adapters=None, loop=loop) + ) + try: + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + except (TypeError, AttributeError): + pass + + return web.json_response({"status": "accepted", "job_id": job_id}, status=202) + + # ------------------------------------------------------------------ # Output extraction helper # ------------------------------------------------------------------ @@ -4196,6 +4254,11 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job) self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job) self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job) + + # Chronos managed-cron fire webhook (NAS → agent). Authenticated by a + # NAS-minted JWT (NOT API_SERVER_KEY), so it has its own auth path. + if _CRON_AVAILABLE: + self._app.router.add_post("/api/cron/fire", self._handle_cron_fire) # Structured event streaming self._app.router.add_post("/v1/runs", self._handle_runs) self._app.router.add_get("/v1/runs/{run_id}", self._handle_get_run) diff --git a/plugins/cron/chronos/verify.py b/plugins/cron/chronos/verify.py new file mode 100644 index 00000000000..99c8db93e4b --- /dev/null +++ b/plugins/cron/chronos/verify.py @@ -0,0 +1,103 @@ +"""Inbound cron-fire token verification for Chronos (Phase 4E.1). + +When NAS relays an external scheduler fire to the agent, it POSTs +``/api/cron/fire`` with a short-lived NAS-minted JWT. This module verifies that +JWT before any job runs — the security boundary for remotely-triggered job +execution. + +We verify a NAS-minted JWT (the trust path the agent already has) rather than +let an external scheduler call the agent directly: the scheduler signs with +NAS's keys, which the agent doesn't (and shouldn't) hold. See the plan's DQ-4. + +The verifier is pluggable (``get_fire_verifier``) so the escape-hatch mode +(direct per-job cron-key) can swap in later with no handler change. + +Crypto is delegated to PyJWT (already a declared dependency) — we do NOT +hand-roll JWT verification. +""" + +from __future__ import annotations + +import logging +from typing import Any, Callable, Dict, Optional + +logger = logging.getLogger("cron.chronos.verify") + +# The purpose claim that scopes a token to the fire endpoint. A general agent +# JWT (without this claim) must NOT be replayable against /api/cron/fire. +_FIRE_PURPOSE = "cron_fire" + + +def verify_nas_fire_token( + *, + token: str, + expected_audience: str, + jwks_or_key: Optional[str] = None, + issuer: Optional[str] = None, + leeway_seconds: int = 30, +) -> Optional[Dict[str, Any]]: + """Verify a NAS-minted cron-fire JWT. Return decoded claims, or None. + + Checks (all must pass): + - signature against the NAS JWKS (``jwks_or_key`` is a JWKS URL) — RS256 + family; symmetric secrets are rejected (NAS signs asymmetrically). + - ``aud`` == ``expected_audience`` (this agent: ``agent:{instance_id}``). + - ``exp`` / ``nbf`` within ``leeway_seconds``. + - ``iss`` == ``issuer`` when an issuer is configured. + - ``purpose`` == ``"cron_fire"`` — so a general agent JWT can't be + replayed against the fire endpoint. + + Returns None (never raises) on any failure, so the handler can answer 401 + without leaking which check failed. + """ + if not token or not expected_audience: + return None + if not jwks_or_key: + # No verification key configured → cannot verify → refuse. We never + # fall back to unsigned decode for a security boundary. + logger.warning("cron fire: no JWKS/key configured; refusing token") + return None + + try: + import jwt + from jwt import PyJWKClient + + # Resolve the signing key from the JWKS endpoint by the token's kid. + signing_key = None + if jwks_or_key.startswith("http://") or jwks_or_key.startswith("https://"): + jwk_client = PyJWKClient(jwks_or_key) + signing_key = jwk_client.get_signing_key_from_jwt(token).key + else: + # A PEM public key passed inline (test / pinned-key deployments). + signing_key = jwks_or_key + + options = {"require": ["exp", "aud"]} + decode_kwargs: Dict[str, Any] = dict( + algorithms=["RS256", "RS384", "RS512", "ES256", "ES384"], + audience=expected_audience, + leeway=leeway_seconds, + options=options, + ) + if issuer: + decode_kwargs["issuer"] = issuer + + claims = jwt.decode(token, signing_key, **decode_kwargs) + except Exception as e: + logger.warning("cron fire: token verification failed: %s", e) + return None + + if claims.get("purpose") != _FIRE_PURPOSE: + logger.warning("cron fire: token missing/!=%s purpose claim", _FIRE_PURPOSE) + return None + + return claims + + +def get_fire_verifier() -> Callable[..., Optional[Dict[str, Any]]]: + """Return the active inbound-fire verifier. + + Default = the NAS-JWT verifier. The DQ-4 escape hatch (direct per-job + cron-key) would return a cron-key verifier here instead, selected by config + — so the webhook handler never changes when the auth mode is swapped. + """ + return verify_nas_fire_token diff --git a/tests/gateway/test_cron_fire_webhook.py b/tests/gateway/test_cron_fire_webhook.py new file mode 100644 index 00000000000..e4aef243526 --- /dev/null +++ b/tests/gateway/test_cron_fire_webhook.py @@ -0,0 +1,152 @@ +"""Tests for the Chronos cron-fire webhook (POST /api/cron/fire) — Phase 4E.2. + +The webhook authenticates a NAS-minted JWT via the pluggable fire-verifier +(NOT API_SERVER_KEY), then runs the job via the resolved provider's fire_due in +the background, returning 202. These tests monkeypatch the verifier and +resolve_cron_scheduler — the verifier itself is tested with real crypto in +test_chronos_verify.py. +""" + +import asyncio + +import pytest +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer + +from gateway.config import PlatformConfig +from gateway.platforms.api_server import APIServerAdapter, cors_middleware + +_MOD = "gateway.platforms.api_server" + + +def _make_adapter() -> APIServerAdapter: + return APIServerAdapter(PlatformConfig(enabled=True, extra={"key": "sk-secret"})) + + +def _create_app(adapter: APIServerAdapter) -> web.Application: + app = web.Application(middlewares=[cors_middleware]) + app["api_server_adapter"] = adapter + app.router.add_post("/api/cron/fire", adapter._handle_cron_fire) + return app + + +@pytest.fixture +def adapter(): + return _make_adapter() + + +class _SpyProvider: + """Records fire_due calls; stands in for the resolved provider.""" + + def __init__(self): + self.fired = [] + + def fire_due(self, job_id, *, adapters=None, loop=None): + self.fired.append(job_id) + return True + + +@pytest.mark.asyncio +async def test_valid_token_accepts_and_fires(adapter, monkeypatch): + """Valid NAS-JWT + {job_id} → 202 and fire_due invoked with that id.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + # verifier returns claims (valid token) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire", "aud": "agent:x"}), + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer good"}, + json={"job_id": "abc123"}) + assert resp.status == 202 + data = await resp.json() + assert data["job_id"] == "abc123" + + # fire runs in a background thread/task — give it a beat to land. + for _ in range(50): + if spy.fired: + break + await asyncio.sleep(0.01) + assert spy.fired == ["abc123"] + + +@pytest.mark.asyncio +async def test_invalid_token_401_and_no_fire(adapter, monkeypatch): + """Bad/forged token → 401, fire_due NOT invoked.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: None), # verification fails + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer forged"}, + json={"job_id": "abc123"}) + assert resp.status == 401 + + await asyncio.sleep(0.05) + assert spy.fired == [] + + +@pytest.mark.asyncio +async def test_missing_token_401(adapter, monkeypatch): + """No Authorization header → verifier gets empty token → 401.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + # Real verifier: empty token returns None. + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", json={"job_id": "abc123"}) + assert resp.status == 401 + assert spy.fired == [] + + +@pytest.mark.asyncio +async def test_missing_job_id_400(adapter, monkeypatch): + """Valid token but no job_id → 400, no fire.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire"}), + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer good"}, + json={}) + assert resp.status == 400 + assert spy.fired == [] + + +@pytest.mark.asyncio +async def test_fire_does_not_require_api_server_key(adapter, monkeypatch): + """The fire endpoint must NOT gate on API_SERVER_KEY — auth is the NAS-JWT. + A request with NO API key header but a valid fire token still succeeds.""" + spy = _SpyProvider() + monkeypatch.setattr("cron.scheduler_provider.resolve_cron_scheduler", lambda: spy) + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire"}), + ) + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + # Bearer is the FIRE token, not the API_SERVER_KEY "sk-secret". + resp = await cli.post("/api/cron/fire", + headers={"Authorization": "Bearer nas-jwt"}, + json={"job_id": "j9"}) + assert resp.status == 202 + for _ in range(50): + if spy.fired: + break + await asyncio.sleep(0.01) + assert spy.fired == ["j9"] diff --git a/tests/plugins/test_chronos_verify.py b/tests/plugins/test_chronos_verify.py new file mode 100644 index 00000000000..1d9259f4eee --- /dev/null +++ b/tests/plugins/test_chronos_verify.py @@ -0,0 +1,182 @@ +"""Tests for the Chronos inbound cron-fire JWT verifier (Phase 4E.1). + +These exercise REAL RS256 signing/verification (PyJWT[crypto] is a declared +dependency) against an inline PEM public key — no mocking of the crypto, since +this is a security boundary. The JWKS-URL path is covered separately by mocking +PyJWKClient's key resolution. +""" + +import time + +import pytest + + +@pytest.fixture(scope="module") +def rsa_keys(): + """An RS256 keypair: (private_pem, public_pem).""" + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + priv = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + pub = key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ).decode() + return priv, pub + + +def _mint(priv, claims): + import jwt + return jwt.encode(claims, priv, algorithm="RS256") + + +AUD = "agent:inst-123" +ISS = "https://portal.nousresearch.com" + + +def _base_claims(**over): + now = int(time.time()) + c = { + "aud": AUD, + "iss": ISS, + "purpose": "cron_fire", + "iat": now, + "nbf": now - 5, + "exp": now + 300, + } + c.update(over) + return c + + +def test_valid_token_returns_claims(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims()) + claims = verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) + assert claims is not None + assert claims["purpose"] == "cron_fire" + assert claims["aud"] == AUD + + +def test_wrong_audience_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims(aud="agent:someone-else")) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_missing_purpose_rejected(rsa_keys): + """A general agent JWT (no purpose=cron_fire) can't fire jobs.""" + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + claims = _base_claims() + del claims["purpose"] + token = _mint(priv, claims) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_wrong_purpose_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims(purpose="inference")) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_expired_token_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + now = int(time.time()) + token = _mint(priv, _base_claims(iat=now - 1000, nbf=now - 1000, exp=now - 600)) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_wrong_issuer_rejected(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims(iss="https://evil.example")) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_tampered_signature_rejected(rsa_keys): + """A token signed by a DIFFERENT key must fail signature verification.""" + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import rsa + from plugins.cron.chronos.verify import verify_nas_fire_token + + _, pub = rsa_keys + attacker = rsa.generate_private_key(public_exponent=65537, key_size=2048) + attacker_priv = attacker.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + token = _mint(attacker_priv, _base_claims()) + # Verified against the REAL public key → signature mismatch → None. + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=pub, issuer=ISS) is None + + +def test_no_key_configured_refuses(rsa_keys): + """No JWKS/key configured → refuse (never fall back to unsigned decode).""" + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, _ = rsa_keys + token = _mint(priv, _base_claims()) + assert verify_nas_fire_token(token=token, expected_audience=AUD, + jwks_or_key=None) is None + + +def test_empty_token_refused(rsa_keys): + from plugins.cron.chronos.verify import verify_nas_fire_token + + _, pub = rsa_keys + assert verify_nas_fire_token(token="", expected_audience=AUD, jwks_or_key=pub) is None + + +def test_jwks_url_path_resolves_key(rsa_keys, monkeypatch): + """The JWKS-URL branch resolves the signing key via PyJWKClient.""" + from plugins.cron.chronos.verify import verify_nas_fire_token + + priv, pub = rsa_keys + token = _mint(priv, _base_claims()) + + class FakeKey: + key = pub + + class FakeJWKClient: + def __init__(self, url): + assert url == "https://portal.nousresearch.com/.well-known/jwks.json" + + def get_signing_key_from_jwt(self, tok): + return FakeKey() + + monkeypatch.setattr("jwt.PyJWKClient", FakeJWKClient) + claims = verify_nas_fire_token( + token=token, expected_audience=AUD, + jwks_or_key="https://portal.nousresearch.com/.well-known/jwks.json", + issuer=ISS, + ) + assert claims is not None and claims["purpose"] == "cron_fire" + + +def test_get_fire_verifier_returns_nas_verifier(): + from plugins.cron.chronos.verify import get_fire_verifier, verify_nas_fire_token + + assert get_fire_verifier() is verify_nas_fire_token From b75757d4aa85e893d6e202c82a7c3392a57dee2e Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 15:11:32 +1000 Subject: [PATCH 11/12] =?UTF-8?q?feat(cron):=20wire=20on=5Fjobs=5Fchanged,?= =?UTF-8?q?=20cron.chronos=20config,=20docs=20+=20agent=E2=86=94NAS=20cont?= =?UTF-8?q?ract?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4F (F.1 + F.2 + F.3, agent side). F.4 is the operator-run live smoke (needs a NAS deployment); recorded in the PR, not code. F.1 — on_jobs_changed wiring: - cron/scheduler.py: _notify_provider_jobs_changed() — resolve the active provider, call on_jobs_changed(), swallow errors. Lives in scheduler.py (not jobs.py) so the store stays free of provider imports (no import cycle). - Wired at the consumer surfaces AFTER a successful mutation: the cronjob model tool (tools/cronjob_tools.py, create/update/remove/pause/resume) — which the `hermes cron` CLI also routes through — and the REST handlers (gateway/platforms/api_server.py, same five). Built-in's no-op default = zero behavior change on the default path. Sleeping-agent direct jobs.json writes (no tool/CLI/REST) are covered by reconcile-on-wake in start(). F.2 — config: cron.chronos.{portal_url,callback_url,expected_audience, nas_jwks_url}. All non-secret; the agent holds no scheduler creds and the outbound provision call reuses the existing Nous token (no token key). Additive deep-merge key, no version literal. F.3 — docs: - docs/chronos-managed-cron-contract.md: authoritative agent↔NAS wire contract (the three agent-cron endpoints + inbound /api/cron/fire + the 3-hop trust model + at-most-once/re-arm semantics). This is what the NAS-side agent builds against. - cron-internals.md: "Managed cron (Chronos) for scale-to-zero" section. - cli-commands.md: cron.provider accepts chronos + the cron.chronos.* keys. - User docs name no scheduler vendor (QStash is a NAS-internal detail). INVARIANT re-verified: zero qstash/upstash hits across plugins/cron, gateway, hermes_cli, tools, website/docs (the one remaining repo hit is an unrelated Context7 MCP comment in tools/mcp_tool.py). Tests: test_jobs_changed_notify (5) — notify calls provider hook, swallows errors, built-in harmless, tool create/remove notify. Full cron + chronos + webhook + config + api_server_jobs suites green (504 in the cron+chronos+webhook run). --- cron/scheduler.py | 18 ++ docs/chronos-managed-cron-contract.md | 192 ++++++++++++++++++ gateway/platforms/api_server.py | 15 ++ hermes_cli/config.py | 19 ++ tests/cron/test_jobs_changed_notify.py | 101 +++++++++ tools/cronjob_tools.py | 15 ++ .../docs/developer-guide/cron-internals.md | 42 ++++ website/docs/reference/cli-commands.md | 12 +- 8 files changed, 409 insertions(+), 5 deletions(-) create mode 100644 docs/chronos-managed-cron-contract.md create mode 100644 tests/cron/test_jobs_changed_notify.py diff --git a/cron/scheduler.py b/cron/scheduler.py index 9bab59456ea..4f7940db0b1 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -2025,6 +2025,24 @@ def run_one_job(job: dict, *, adapters=None, loop=None, verbose: bool = False) - return False +def _notify_provider_jobs_changed() -> None: + """Best-effort: tell the active scheduler provider the job set changed. + + Called by the consumer surfaces (model tool / CLI / REST) AFTER a + successful store mutation (create/update/remove/pause/resume) so an external + provider (Chronos) can re-provision/cancel the affected one-shot via NAS. + No-op for the built-in (it re-reads jobs.json each tick), so the default + path is unchanged. Lives here (not in cron/jobs.py) to keep the store free + of provider imports — avoids an import cycle and keeps jobs.py low-coupling. + Never raises into the caller. + """ + try: + from cron.scheduler_provider import resolve_cron_scheduler + resolve_cron_scheduler().on_jobs_changed() + except Exception as e: + logger.debug("on_jobs_changed notify failed: %s", e) + + def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> int: """ Check and run all due jobs. diff --git a/docs/chronos-managed-cron-contract.md b/docs/chronos-managed-cron-contract.md new file mode 100644 index 00000000000..0848d5eb939 --- /dev/null +++ b/docs/chronos-managed-cron-contract.md @@ -0,0 +1,192 @@ +# Chronos managed-cron — agent ↔ NAS wire contract + +**Status:** authoritative wire spec for the Chronos cron provider. +**Audience:** the NAS-side implementer of the `agent-cron` endpoints +(`nous-account-service`) and anyone debugging the managed-cron path. + +Chronos lets a hosted Hermes gateway **scale to zero** while idle and still +fire cron jobs. Instead of an in-process 60-second ticker, the agent asks NAS +to arm exactly **one external one-shot per job at that job's real next-fire +time**. NAS calls the agent back at fire time over an authenticated webhook; +the agent runs the job and re-arms the next one-shot. Between fires the agent +process can be fully stopped — it wakes only on a genuine fire. + +The external scheduler NAS uses to implement the one-shots is an **internal NAS +implementation detail**. The agent never talks to it, never holds its +credentials, and never names it. The agent only knows the three NAS endpoints +below. + +``` +create/update/pause/resume/remove a cron job (agent side) + │ + ▼ +ChronosCronScheduler.reconcile() ── agent computes next_run_at + │ POST {portal}/api/agent-cron/provision (auth: agent's Nous access token) + ▼ +NAS arms a one-shot for fire_at ── NAS owns the scheduler + its creds + │ + ⏰ at fire_at + ▼ +scheduler → POST {portal}/api/agent-cron/relay (auth: scheduler signature, NAS-verified) + │ + ▼ +NAS mints a short-lived agent-audience JWT (purpose=cron_fire) + │ POST {agent_callback_url}/api/cron/fire (auth: that JWT) + ▼ +agent verifies the NAS JWT → store CAS claim → run_one_job → re-arm next one-shot +``` + +## Trust model (read this first) + +| Hop | Who calls whom | Auth mechanism | Verified by | +|---|---|---|---| +| 1 | agent → NAS (`provision`/`cancel`/`list`) | the agent's existing **Nous Portal access token** (Bearer) | NAS (its normal agent-token path) | +| 2 | scheduler → NAS (`relay`) | the scheduler's request **signature** | NAS (the signature path it already has) | +| 3 | NAS → agent (`/api/cron/fire`) | a **short-lived NAS-minted JWT** (`aud=agent:{instance_id}`, `purpose=cron_fire`) | agent (PyJWT against NAS JWKS) | + +Why NAS-mediated rather than scheduler→agent direct: the scheduler signs with +**NAS's** keys, which the agent does not (and should not) hold. The agent can +only verify a **NAS-minted** token — a trust path it already has. This keeps +all scheduler credentials inside NAS. (Full rationale: the plan's DQ-4.) + +No new secret is introduced on the agent: hop 1 reuses the token the agent +already uses for the portal, and hop 3 reuses the NAS-JWT verification the agent +already performs. + +--- + +## Endpoint 1 — `POST /api/agent-cron/provision` (agent → NAS) + +Arm (or re-arm, idempotently) exactly one one-shot for a job. + +- **Auth:** `Authorization: Bearer `. NAS validates via + its normal agent-token path and scopes the row to the calling agent/org. +- **Request body:** + ```json + { + "job_id": "ab12cd34", + "fire_at": "2026-06-18T12:34:56+00:00", + "agent_callback_url": "https://agent-xyz.fly.dev", + "dedup_key": "ab12cd34:2026-06-18T12:34:56+00:00" + } + ``` + - `fire_at` — ISO 8601, **agent-computed**. May be sub-minute in the future; + NAS must honor second-granularity (the agent owns the time, so there is no + 1-minute scheduler floor). + - `agent_callback_url` — the agent's own publicly-reachable base URL. NAS + POSTs `{agent_callback_url}/api/cron/fire` at fire time. + - `dedup_key` — `"{job_id}:{fire_at}"`. NAS **upserts by `(agent_id, job_id)`** + so re-arming the same fire is idempotent (no duplicate one-shots). A new + `fire_at` for the same `job_id` replaces the prior arm. +- **Action:** arm one one-shot to fire at `fire_at`, destined for the NAS + **relay** route (Endpoint 3) — NOT the agent directly, so NAS stays in the + loop to mint the agent JWT. Persist `(agent_id, job_id, schedule_id, + agent_callback_url)`. +- **Response:** `200 {"schedule_id": ""}`. + +## Endpoint 2 — `POST /api/agent-cron/cancel` (agent → NAS) + +- **Auth:** same as Endpoint 1. +- **Body:** `{"job_id": "ab12cd34"}`. +- **Action:** cancel the armed one-shot for `(agent_id, job_id)` and delete the + row. Idempotent — cancelling an unknown job is a 200 no-op. +- **Response:** `200 {"ok": true}`. + +## Endpoint 3 — `POST /api/agent-cron/relay` (scheduler → NAS, the fire relay) + +- **Auth:** the scheduler's request **signature**, verified by NAS with the + signature path it already has. This is the trust boundary for the fire — a + forged relay call must be rejected here. +- **Action:** + 1. Look up `(agent_id, job_id) → agent_callback_url` from the persisted row. + 2. Mint a **short-lived** JWT: `aud = "agent:{instance_id}"`, + `iss = {portal_url}`, `purpose = "cron_fire"`, small `exp` (≈60–120s), + signed with NAS's normal asymmetric signing key (published via JWKS). + 3. `POST {agent_callback_url}/api/cron/fire` with + `Authorization: Bearer ` and body `{"job_id": "...", "fire_at": "..."}`. + 4. Treat a non-2xx agent response as a **retryable** failure (let the + scheduler retry the relay). The agent's store CAS de-dupes a double fire, + so retries are safe. +- **Response to the scheduler:** 2xx once the agent POST is accepted (202), so + the scheduler does not retry a delivered fire. + +--- + +## Inbound `POST /api/cron/fire` (NAS → agent) — agent side, already implemented + +This is the agent endpoint NAS calls in Endpoint 3 step 3. Implemented on the +`APIServerAdapter` (`gateway/platforms/api_server.py`); the verifier is +`plugins/cron/chronos/verify.py`. + +- **Auth:** `Authorization: Bearer `. The agent verifies: + - signature against the NAS JWKS (`cron.chronos.nas_jwks_url`), + - `aud` == `cron.chronos.expected_audience` (this agent's + `agent:{instance_id}`), + - `iss` == `cron.chronos.portal_url`, + - `exp` / `nbf` (30s leeway), + - `purpose == "cron_fire"` — a general agent JWT (no/other purpose) is + rejected so it can't be replayed against this endpoint. +- **Body:** `{"job_id": "ab12cd34", "fire_at": "..."}` (only `job_id` is used). +- **Behavior:** + - invalid/missing/forged/expired/wrong-aud/wrong-purpose token → **401**, no + execution. + - missing `job_id` → **400**. + - valid → **202 `{"status": "accepted", "job_id": "..."}`** immediately, and + the job runs in the background. 202-before-run means a long agent turn never + trips the relay's HTTP timeout. +- **At-most-once:** the agent claims the job with a store-level compare-and-set + (`claim_job_for_fire`) before running. A relay/scheduler retry that arrives + while the first fire is in flight (or after it completed) loses the claim and + does not double-run. + +--- + +## At-most-once & re-arm semantics + +- **Recurring (cron/interval):** on fire, the agent advances `next_run_at` + (under its store lock) as part of the claim, runs the job, then re-provisions + a one-shot for the new `next_run_at`. A duplicate relay for the old `fire_at` + finds the claim taken / time advanced and is dropped. +- **One-shot (`30m`, `+90s`, etc.):** fires once; `mark_job_run` marks it + completed. No re-arm. +- **`repeat.times = N`:** `mark_job_run` deletes the job at the limit, so + `get_job` returns `None` after the final fire → the agent does **not** re-arm + → the schedule stops cleanly with no orphaned one-shot. +- **Multi-replica agents:** the store CAS makes the fire at-most-once across N + gateway replicas sharing one `HERMES_HOME` — exactly one replica runs each + fire. + +## Reconcile (self-healing) + +The agent reconciles desired (`jobs.json`) vs armed on: +- `start()` (gateway boot / wake), +- every successful job mutation (`on_jobs_changed`), +- piggybacked after each fire (re-arm). + +Reconcile arms missing/changed-time jobs and cancels orphans. A missed +provision (transient NAS error) self-heals on the next reconcile. There is **no +periodic wake** of a sleeping agent — that would negate scale-to-zero. + +## Config (agent side) + +All non-secret (`cron.chronos.*` in `config.yaml`); the agent holds no scheduler +credentials. For hosted agents NAS sets these at provision time: + +| key | meaning | +|---|---| +| `cron.provider` | `"chronos"` to activate (empty = built-in ticker) | +| `cron.chronos.portal_url` | NAS base URL (also the expected JWT `iss`) | +| `cron.chronos.callback_url` | the agent's own public base URL for NAS→agent fires | +| `cron.chronos.expected_audience` | this agent's JWT `aud` (`agent:{instance_id}`) | +| `cron.chronos.nas_jwks_url` | NAS JWKS for verifying the fire JWT | + +If `callback_url` / `portal_url` is blank or the agent has no Nous login, +`is_available()` returns False and the resolver falls back to the built-in +in-process ticker — cron never loses its trigger. + +## Escape hatch (not default) + +The inbound `/api/cron/fire` verifier is pluggable (`get_fire_verifier()`). If +relay volume through NAS ever saturates, a direct scheduler→agent mode with a +per-job NAS-minted cron-key can replace the NAS-JWT verifier with **no change to +the webhook handler**. NAS-mediated (this contract) is the default. diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index c657f4b4c6d..f7e1ba42f85 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -717,6 +717,16 @@ except ImportError: _cron_resume = None _cron_trigger = None + +def _notify_cron_provider_jobs_changed() -> None: + """Tell the active cron scheduler provider the job set changed after a REST + mutation (no-op for the built-in). Best-effort — never breaks the handler.""" + try: + from cron.scheduler import _notify_provider_jobs_changed + _notify_provider_jobs_changed() + except Exception: + pass + # Defense-in-depth: mirror the agent-facing cronjob tool, which scans the # user-supplied prompt for exfiltration/injection payloads at create/update # time (tools/cronjob_tools.py). The REST cron endpoints are authenticated @@ -3206,6 +3216,7 @@ class APIServerAdapter(BasePlatformAdapter): kwargs["repeat"] = repeat job = _cron_create(**kwargs) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3262,6 +3273,7 @@ class APIServerAdapter(BasePlatformAdapter): job = _cron_update(job_id, sanitized) if not job: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3281,6 +3293,7 @@ class APIServerAdapter(BasePlatformAdapter): success = _cron_remove(job_id) if not success: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"ok": True}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3300,6 +3313,7 @@ class APIServerAdapter(BasePlatformAdapter): job = _cron_pause(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -3319,6 +3333,7 @@ class APIServerAdapter(BasePlatformAdapter): job = _cron_resume(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) + _notify_cron_provider_jobs_changed() return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) diff --git a/hermes_cli/config.py b/hermes_cli/config.py index d53393ac432..79f56be5d2e 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2132,6 +2132,25 @@ DEFAULT_CONFIG = { # An unknown or unavailable provider falls back to the built-in, so cron # never loses its trigger. "provider": "", + # Chronos (NAS-mediated managed cron) settings. Only consulted when + # provider == "chronos". All non-secret (URLs + the JWT audience): the + # agent holds NO external-scheduler credentials. For hosted agents, NAS + # sets these at provision time. The outbound provision call reuses the + # agent's existing Nous Portal token — there is no token key here. + "chronos": { + # NAS / portal base URL the agent calls to arm/cancel one-shots + # and that mints the inbound fire JWT (used as the expected issuer). + "portal_url": "https://portal.nousresearch.com", + # The agent's OWN publicly-reachable base URL for NAS→agent fires + # (NAS POSTs {callback_url}/api/cron/fire). Empty → Chronos is + # unavailable and the resolver falls back to the built-in ticker. + "callback_url": "", + # This agent's expected JWT audience (e.g. "agent:{instance_id}"). + "expected_audience": "", + # NAS JWKS URL for verifying the inbound fire JWT's signature. + # Empty → the fire endpoint refuses all tokens (no unsigned decode). + "nas_jwks_url": "", + }, # Wrap delivered cron responses with a header (task name) and footer # ("The agent cannot see this message"). Set to false for clean output. "wrap_response": True, diff --git a/tests/cron/test_jobs_changed_notify.py b/tests/cron/test_jobs_changed_notify.py new file mode 100644 index 00000000000..eed875186b4 --- /dev/null +++ b/tests/cron/test_jobs_changed_notify.py @@ -0,0 +1,101 @@ +"""Tests for on_jobs_changed wiring (Phase 4F.1). + +After a store mutation via the consumer surfaces (model tool / CLI / REST), the +active scheduler provider's on_jobs_changed() must be invoked so an external +provider (Chronos) re-provisions/cancels. The built-in's no-op default means +the default path is unchanged. +""" + +import pytest + + +@pytest.fixture +def temp_home(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + yield tmp_path + + +def test_notify_helper_calls_provider_on_jobs_changed(monkeypatch): + """cron.scheduler._notify_provider_jobs_changed resolves the provider and + calls on_jobs_changed exactly once.""" + import cron.scheduler_provider as sp + import cron.scheduler as sched + + calls = [] + + class Spy(sp.CronScheduler): + @property + def name(self): + return "spy" + + def start(self, stop_event, **kw): + pass + + def on_jobs_changed(self): + calls.append(1) + + monkeypatch.setattr(sp, "resolve_cron_scheduler", lambda: Spy()) + sched._notify_provider_jobs_changed() + assert calls == [1] + + +def test_notify_helper_swallows_provider_errors(monkeypatch): + """A provider that raises in on_jobs_changed must not propagate into the + caller (best-effort notify).""" + import cron.scheduler_provider as sp + import cron.scheduler as sched + + class Boom(sp.CronScheduler): + @property + def name(self): + return "boom" + + def start(self, stop_event, **kw): + pass + + def on_jobs_changed(self): + raise RuntimeError("kaboom") + + monkeypatch.setattr(sp, "resolve_cron_scheduler", lambda: Boom()) + sched._notify_provider_jobs_changed() # must not raise + + +def test_builtin_notify_is_harmless(monkeypatch): + """With the built-in provider (default), notify is a no-op and never + raises.""" + import cron.scheduler as sched + # default resolution → built-in; just assert it doesn't blow up. + sched._notify_provider_jobs_changed() + + +def test_tool_create_notifies_provider(temp_home, monkeypatch): + """Creating a job via the cronjob tool path invokes on_jobs_changed.""" + import cron.scheduler as sched + calls = [] + monkeypatch.setattr(sched, "_notify_provider_jobs_changed", + lambda: calls.append("changed")) + + from tools.cronjob_tools import cronjob + import json + + out = json.loads(cronjob(action="create", prompt="echo hi", schedule="every 5m", name="w")) + assert out["success"] is True + assert calls == ["changed"] + + +def test_tool_remove_notifies_provider(temp_home, monkeypatch): + """Removing a job via the tool path invokes on_jobs_changed.""" + import json + from tools.cronjob_tools import cronjob + + created = json.loads(cronjob(action="create", prompt="x", schedule="every 5m", name="r")) + jid = created["job_id"] + + import cron.scheduler as sched + calls = [] + monkeypatch.setattr(sched, "_notify_provider_jobs_changed", + lambda: calls.append("changed")) + + out = json.loads(cronjob(action="remove", job_id=jid)) + assert out["success"] is True + assert calls == ["changed"] diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 7ec31b806c4..0bd62b2fc37 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -33,6 +33,16 @@ from cron.jobs import ( ) +def _notify_provider_jobs_changed_safe() -> None: + """Tell the active cron scheduler provider the job set changed (no-op for + the built-in). Best-effort — never lets a provider error break the tool.""" + try: + from cron.scheduler import _notify_provider_jobs_changed + _notify_provider_jobs_changed() + except Exception: + pass + + # --------------------------------------------------------------------------- # Cron prompt scanning # --------------------------------------------------------------------------- @@ -549,6 +559,7 @@ def cronjob( workdir=_normalize_optional_job_value(workdir), no_agent=_no_agent, ) + _notify_provider_jobs_changed_safe() return json.dumps( { "success": True, @@ -604,6 +615,7 @@ def cronjob( removed = remove_job(job_id) if not removed: return tool_error(f"Failed to remove job '{job_id}'", success=False) + _notify_provider_jobs_changed_safe() return json.dumps( { "success": True, @@ -619,10 +631,12 @@ def cronjob( if normalized == "pause": updated = pause_job(job_id, reason=reason) + _notify_provider_jobs_changed_safe() return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) if normalized == "resume": updated = resume_job(job_id) + _notify_provider_jobs_changed_safe() return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) if normalized in {"run", "run_now", "trigger"}: @@ -711,6 +725,7 @@ def cronjob( if not updates: return tool_error("No updates provided.", success=False) updated = update_job(job_id, updates) + _notify_provider_jobs_changed_safe() return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) return tool_error(f"Unknown cron action '{action}'", success=False) diff --git a/website/docs/developer-guide/cron-internals.md b/website/docs/developer-guide/cron-internals.md index c895d339b09..386302554d7 100644 --- a/website/docs/developer-guide/cron-internals.md +++ b/website/docs/developer-guide/cron-internals.md @@ -129,6 +129,48 @@ A provider only controls the trigger, never execution. In CLI mode, cron jobs only fire when `hermes cron` commands are run or during active CLI sessions. +### Managed cron (Chronos) for scale-to-zero + +Hosted gateways can run the **Chronos** provider (`cron.provider: chronos`) +instead of the built-in ticker. Chronos lets an idle gateway **scale to zero** +and still fire cron jobs: rather than a 60-second in-process loop (which would +keep the process awake), it asks Nous infrastructure to arm exactly **one +managed one-shot per job at that job's real next-fire time**. At fire time Nous +calls the gateway back over an authenticated webhook (`POST /api/cron/fire`); +the gateway runs the job through the same `run_one_job` path as the built-in, +then re-arms the next one-shot. Between fires the process can be fully stopped — +it wakes only on a genuine fire, never on a periodic timer. + +The flow (the managed scheduler is provided by Nous; the agent holds no +scheduler credentials): + +``` +create/update a cron job + → Chronos asks Nous to arm a one-shot at the job's next_run_at + (authenticated with the agent's existing Nous token) + → at fire time Nous calls the gateway: POST {callback_url}/api/cron/fire + (authenticated with a short-lived, purpose-scoped Nous-minted JWT) + → the gateway verifies the token, claims the job (store compare-and-set so + multi-replica deployments fire at-most-once), runs it, and re-arms the next + one-shot +``` + +Config (all non-secret; on hosted agents Nous sets these at provision time): + +| key | meaning | +|---|---| +| `cron.provider` | `chronos` to activate (empty = built-in ticker) | +| `cron.chronos.portal_url` | Nous base URL (arming + the fire-token issuer) | +| `cron.chronos.callback_url` | the gateway's own public base URL for inbound fires | +| `cron.chronos.expected_audience` | this agent's fire-token audience | +| `cron.chronos.nas_jwks_url` | key set for verifying the inbound fire token | + +If Chronos is misconfigured or the agent isn't logged into Nous, +`resolve_cron_scheduler()` falls back to the built-in ticker (logged warning) — +cron never loses its trigger. Recurring jobs re-arm after each fire; `repeat`-N +jobs stop cleanly when the count is exhausted (no orphaned one-shot). The full +agent↔Nous wire contract lives in `docs/chronos-managed-cron-contract.md`. + ### Fresh Session Isolation Each cron job runs in a completely fresh agent session: diff --git a/website/docs/reference/cli-commands.md b/website/docs/reference/cli-commands.md index f0fe67d4349..0cf004f1a0c 100644 --- a/website/docs/reference/cli-commands.md +++ b/website/docs/reference/cli-commands.md @@ -534,11 +534,13 @@ hermes cron | `tick` | Run due jobs once and exit. | The cron **trigger** is pluggable via the `cron.provider` config key. Empty -(the default) uses the built-in in-process ticker. A named provider (e.g. -`chronos`, a managed-cron provider for scale-to-zero deployments) is discovered -from `plugins/cron//` or `$HERMES_HOME/plugins//`; an unknown or -unavailable provider falls back to the built-in, so cron is never left without -a trigger. See the [cron internals](../developer-guide/cron-internals.md#gateway-integration) doc. +(the default) uses the built-in in-process ticker. Set it to `chronos` (the +NAS-managed provider for scale-to-zero hosted gateways) — configured via the +`cron.chronos.*` keys (`portal_url`, `callback_url`, `expected_audience`, +`nas_jwks_url`) — or name a custom provider under `plugins/cron//` or +`$HERMES_HOME/plugins//`. An unknown or unavailable provider falls back to +the built-in, so cron is never left without a trigger. See the +[cron internals](../developer-guide/cron-internals.md#gateway-integration) doc. ## `hermes kanban` From c34840e22e086387e0a1e0d72a50a4c7988b4f81 Mon Sep 17 00:00:00 2001 From: Ben Date: Fri, 19 Jun 2026 12:43:30 +1000 Subject: [PATCH 12/12] fix(cron): serve /api/cron/fire on the dashboard app (hosted-agent surface) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Live-test finding: the Chronos fire webhook was only on the APIServerAdapter (aiohttp), but hosted agents expose `hermes dashboard` (the FastAPI web_server app on :9119) as their public URL — NOT the api_server adapter. So NAS's relay callback to {callback_url}/api/cron/fire could never reach the verifier on a hosted agent (the exact target environment). Two layers were wrong: 1. Wrong server: /api/cron/fire didn't exist on the dashboard app. Added cron_fire_webhook there, alongside the existing /api/cron/* dashboard routes. It resolves the job's profile (_find_cron_job_profile) and runs fire_due via the resolved provider under the cron-profile retarget lock (_fire_cron_job_for_profile, mirroring _call_cron_for_profile) so the CAS claim + run_one_job operate on the right profile's jobs.json. Runs with no live adapters (delivery falls back to the per-platform send path, like the desktop cron path). 202 + background so a long turn never trips NAS's timeout; the store CAS de-dupes a NAS retry. job-not-found -> 200 "gone". 2. Auth gate: the dashboard auth middleware 401s any non-cookie request before the handler runs. Added /api/cron/fire to the shared PUBLIC_API_PATHS so the NAS bearer-JWT callback reaches the verifier — the JWT (purpose=cron_fire), not the cookie, is the real gate. One shared frozenset feeds both the loopback and OAuth middlewares, so no drift. Kept the APIServerAdapter route too (valid self-host api_server surface). Contract doc updated to name the dashboard app as the hosted-agent callback surface. Tests: test_cron_fire_dashboard (6) — route registered on the dashboard app, in PUBLIC_API_PATHS, 401 on bad token WITH the cookie gate engaged (proves it's reachable past the gate + JWT is the gate), 400 missing job_id, 200 gone for unknown job, 202 + fire_due invoked for the resolved profile on a valid token. Full hermes_cli + cron + chronos + webhook suites green (7637). Why the original tests missed it: the api_server webhook test built an APIServerAdapter client directly and never asserted which server the hosted public URL exposes — green-but-wrong-integration. The new test pins the route to the dashboard app. --- docs/chronos-managed-cron-contract.md | 8 +- hermes_cli/dashboard_auth/public_paths.py | 6 + hermes_cli/web_server.py | 87 ++++++++++++ tests/hermes_cli/test_cron_fire_dashboard.py | 142 +++++++++++++++++++ 4 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 tests/hermes_cli/test_cron_fire_dashboard.py diff --git a/docs/chronos-managed-cron-contract.md b/docs/chronos-managed-cron-contract.md index 0848d5eb939..64937a9c994 100644 --- a/docs/chronos-managed-cron-contract.md +++ b/docs/chronos-managed-cron-contract.md @@ -114,8 +114,12 @@ Arm (or re-arm, idempotently) exactly one one-shot for a job. ## Inbound `POST /api/cron/fire` (NAS → agent) — agent side, already implemented -This is the agent endpoint NAS calls in Endpoint 3 step 3. Implemented on the -`APIServerAdapter` (`gateway/platforms/api_server.py`); the verifier is +This is the agent endpoint NAS calls in Endpoint 3 step 3. Served by the +**dashboard app** (`hermes_cli/web_server.py`) — the agent's always-reachable +public HTTP surface on hosted deployments (the gateway may be idle/scaled down); +it is in `PUBLIC_API_PATHS` so the dashboard cookie gate lets the bearer-JWT +callback through to the verifier. (Also registered on the optional +`APIServerAdapter` for self-host API-server deployments.) The verifier is `plugins/cron/chronos/verify.py`. - **Auth:** `Authorization: Bearer `. The agent verifies: diff --git a/hermes_cli/dashboard_auth/public_paths.py b/hermes_cli/dashboard_auth/public_paths.py index 2699e15c979..349937cffa0 100644 --- a/hermes_cli/dashboard_auth/public_paths.py +++ b/hermes_cli/dashboard_auth/public_paths.py @@ -46,4 +46,10 @@ PUBLIC_API_PATHS: frozenset[str] = frozenset({ # Read-only theme + plugin manifests for the dashboard skin engine. "/api/dashboard/themes", "/api/dashboard/plugins", + # Chronos managed-cron fire webhook (NAS -> agent). NOT cookie-gated: it + # carries its own short-lived NAS-minted JWT (purpose=cron_fire), which the + # handler verifies as the real auth. Must bypass the dashboard auth gate so + # the NAS relay's bearer-only callback reaches the verifier instead of a + # 401 no_cookie. The JWT — not this allowlist — is the security boundary. + "/api/cron/fire", }) diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index a338ebfc131..c3095dd727e 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -7310,6 +7310,93 @@ async def delete_cron_job(job_id: str, profile: Optional[str] = None): return {"ok": True} +def _fire_cron_job_for_profile(profile: str, job_id: str) -> bool: + """Run ONE due cron job end-to-end for ``profile`` via the resolved + scheduler provider's ``fire_due`` (store CAS claim + ``run_one_job``). + + Retargets the ``cron.jobs`` module globals to the profile's cron dir under + the shared lock — same mechanism as ``_call_cron_for_profile`` — so the + claim and the run operate on the right profile's ``jobs.json``. Runs with + no live adapters; delivery falls back to the per-platform send path (the + dashboard process has no gateway adapter handles, exactly like the desktop + cron path above). + """ + _profile_name, home = _cron_profile_home(profile) + with _CRON_PROFILE_LOCK: + from cron import jobs as cron_jobs + from cron.scheduler_provider import resolve_cron_scheduler + + old_cron_dir = cron_jobs.CRON_DIR + old_jobs_file = cron_jobs.JOBS_FILE + old_output_dir = cron_jobs.OUTPUT_DIR + cron_jobs.CRON_DIR = home / "cron" + cron_jobs.JOBS_FILE = cron_jobs.CRON_DIR / "jobs.json" + cron_jobs.OUTPUT_DIR = cron_jobs.CRON_DIR / "output" + try: + provider = resolve_cron_scheduler() + return bool(provider.fire_due(job_id, adapters=None, loop=None)) + finally: + cron_jobs.CRON_DIR = old_cron_dir + cron_jobs.JOBS_FILE = old_jobs_file + cron_jobs.OUTPUT_DIR = old_output_dir + + +@app.post("/api/cron/fire") +async def cron_fire_webhook(request: Request): + """Chronos managed-cron fire webhook (NAS -> agent). + + Authenticated by a short-lived NAS-minted JWT (verified by the pluggable + Chronos fire-verifier), NOT the dashboard session cookie — so this path is + in ``PUBLIC_API_PATHS`` to bypass the dashboard auth gate, and the JWT is + the real gate. This is the inbound half of scale-to-zero managed cron: NAS + POSTs here at fire time, the agent verifies, claims the job (store CAS, so + at-most-once across replicas / on a NAS retry), runs it, and re-arms the + next one-shot. + + Lives on the dashboard app (not the api_server adapter) because the + dashboard is the agent's always-reachable public HTTP surface on hosted + deployments; the gateway may be idle/scaled down. + + Returns 202 immediately and runs the job in the background so a long agent + turn never trips NAS's HTTP timeout. + """ + from plugins.cron.chronos.verify import get_fire_verifier + + auth = request.headers.get("Authorization", "") + token = auth[7:].strip() if auth.startswith("Bearer ") else "" + + cfg = load_config() + claims = get_fire_verifier()( + token=token, + expected_audience=cfg_get(cfg, "cron", "chronos", "expected_audience", default=""), + jwks_or_key=cfg_get(cfg, "cron", "chronos", "nas_jwks_url", default="") or None, + issuer=cfg_get(cfg, "cron", "chronos", "portal_url", default="") or None, + ) + if claims is None: + return JSONResponse({"error": "invalid fire token"}, status_code=401) + + try: + body = await request.json() + except Exception: + body = {} + job_id = (body or {}).get("job_id") if isinstance(body, dict) else None + if not job_id: + return JSONResponse({"error": "missing job_id"}, status_code=400) + + profile = _find_cron_job_profile(job_id) + if not profile: + # Job is gone (cancelled / completed) — nothing to fire. 200 so NAS + # does not retry a fire that is intentionally absent. + return JSONResponse({"status": "gone", "job_id": job_id}, status_code=200) + + # Run in the background; the store CAS claim inside fire_due de-dupes a + # NAS/scheduler retry that arrives while this is in flight. + asyncio.create_task( + asyncio.to_thread(_fire_cron_job_for_profile, profile, job_id) + ) + return JSONResponse({"status": "accepted", "job_id": job_id}, status_code=202) + + # --------------------------------------------------------------------------- # Automation Blueprints — parameterized automation blueprints. The dashboard renders the # slot schema as a form; submitting instantiates a real cron job via the same diff --git a/tests/hermes_cli/test_cron_fire_dashboard.py b/tests/hermes_cli/test_cron_fire_dashboard.py new file mode 100644 index 00000000000..44d6f07c270 --- /dev/null +++ b/tests/hermes_cli/test_cron_fire_dashboard.py @@ -0,0 +1,142 @@ +"""Tests for the Chronos cron-fire webhook ON THE DASHBOARD APP (web_server). + +Regression guard for the relocation bug: the fire webhook MUST live on the +dashboard FastAPI app (`hermes_cli.web_server.app`) — the agent's public HTTP +surface on hosted deployments — not only on the aiohttp APIServerAdapter (which +hosted agents don't expose). It must: + - be a registered route on the dashboard app, + - be in PUBLIC_API_PATHS so the dashboard cookie gate doesn't 401 it before + the JWT verifier runs, + - reject a bad/missing NAS-JWT with 401 (the JWT is the real gate), + - 400 on missing job_id, + - on a valid token, resolve the job's profile and run fire_due in the + background, returning 202. +""" + +import pytest +from starlette.testclient import TestClient + +from hermes_cli import web_server +from hermes_cli.dashboard_auth.public_paths import PUBLIC_API_PATHS + + +def _client(auth_required: bool): + prev_auth = getattr(web_server.app.state, "auth_required", None) + prev_host = getattr(web_server.app.state, "bound_host", None) + web_server.app.state.auth_required = auth_required + web_server.app.state.bound_host = None + client = TestClient(web_server.app) + return client, prev_auth, prev_host + + +def _restore(prev_auth, prev_host): + if prev_auth is None: + if hasattr(web_server.app.state, "auth_required"): + delattr(web_server.app.state, "auth_required") + else: + web_server.app.state.auth_required = prev_auth + if prev_host is None: + if hasattr(web_server.app.state, "bound_host"): + delattr(web_server.app.state, "bound_host") + else: + web_server.app.state.bound_host = prev_host + + +def test_route_registered_on_dashboard_app(): + """The fire webhook is served by the dashboard app (the hosted-agent public + surface), not only the aiohttp adapter.""" + paths = {r.path for r in web_server.app.routes if hasattr(r, "path")} + assert "/api/cron/fire" in paths + + +def test_fire_path_is_public(): + """Must bypass the dashboard cookie gate so the NAS bearer-JWT callback + reaches the verifier (the JWT is the real auth).""" + assert "/api/cron/fire" in PUBLIC_API_PATHS + + +def test_bad_token_401(monkeypatch): + """Invalid NAS-JWT -> 401, even with the dashboard auth gate ENGAGED + (proves the route is reachable past the cookie gate and the verifier is the + gate). fire_due must NOT run.""" + fired = [] + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: None), # verification fails + ) + monkeypatch.setattr(web_server, "_find_cron_job_profile", lambda jid: "default") + monkeypatch.setattr(web_server, "_fire_cron_job_for_profile", + lambda p, j: fired.append((p, j))) + + client, pa, ph = _client(auth_required=True) + try: + resp = client.post("/api/cron/fire", + headers={"Authorization": "Bearer forged"}, + json={"job_id": "abc"}) + assert resp.status_code == 401 + assert fired == [] + finally: + _restore(pa, ph) + client.close() + + +def test_missing_job_id_400(monkeypatch): + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire"}), + ) + client, pa, ph = _client(auth_required=False) + try: + resp = client.post("/api/cron/fire", + headers={"Authorization": "Bearer good"}, + json={}) + assert resp.status_code == 400 + finally: + _restore(pa, ph) + client.close() + + +def test_unknown_job_200_gone(monkeypatch): + """Valid token but the job isn't found in any profile -> 200 'gone' + (NAS shouldn't retry a fire for a cancelled/completed job).""" + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire"}), + ) + monkeypatch.setattr(web_server, "_find_cron_job_profile", lambda jid: None) + client, pa, ph = _client(auth_required=False) + try: + resp = client.post("/api/cron/fire", + headers={"Authorization": "Bearer good"}, + json={"job_id": "ghost"}) + assert resp.status_code == 200 + assert resp.json().get("status") == "gone" + finally: + _restore(pa, ph) + client.close() + + +def test_valid_token_accepts_and_fires(monkeypatch): + """Valid token + known job -> 202 and fire_due invoked for the resolved + profile.""" + fired = [] + monkeypatch.setattr( + "plugins.cron.chronos.verify.get_fire_verifier", + lambda: (lambda **kw: {"purpose": "cron_fire", "aud": "agent:x"}), + ) + monkeypatch.setattr(web_server, "_find_cron_job_profile", lambda jid: "default") + monkeypatch.setattr(web_server, "_fire_cron_job_for_profile", + lambda p, j: fired.append((p, j)) or True) + + client, pa, ph = _client(auth_required=False) + try: + resp = client.post("/api/cron/fire", + headers={"Authorization": "Bearer good"}, + json={"job_id": "j1"}) + assert resp.status_code == 202 + assert resp.json()["job_id"] == "j1" + finally: + _restore(pa, ph) + client.close() + # background task ran the fire for the resolved profile + assert fired == [("default", "j1")]