hermes-agent/cron/jobs.py
Teknium 3db6b9cc87
feat(cron): add no_agent mode for script-only cron jobs (watchdog pattern) (#19709)
* feat(cron): add no_agent mode for script-only cron jobs (watchdog pattern)

Adds a no_agent=True option to the cronjob system. When enabled, the
scheduler runs the attached script on schedule and delivers its stdout
directly to the job's target — no LLM, no agent loop, no token spend.
This is the classic bash-watchdog pattern (memory alert every 5 min,
disk alert every 15 min, CI ping) reimplemented as a first-class Hermes
primitive instead of a systemd timer + curl + bot token triplet living
outside the system.

## What

  hermes cron create "every 5m" \
    --no-agent \
    --script memory-watchdog.sh \
    --deliver telegram \
    --name memory-watchdog

Agent tool:

  cronjob(action='create',
          schedule='every 5m',
          script='memory-watchdog.sh',
          no_agent=True,
          deliver='telegram')

Semantics:
- Script stdout (trimmed) → delivered verbatim as the message
- Empty stdout          → silent tick (no delivery; watchdog pattern)
- wakeAgent=false gate  → silent tick (same gate LLM jobs use)
- Non-zero exit/timeout → delivered as an error alert
                          (broken watchdogs shouldn't fail silently)
- No LLM ever invoked; no tokens spent; no provider fallback applied

## Implementation

cron/jobs.py
  * create_job gains no_agent: bool = False
  * prompt becomes Optional (no_agent jobs don't need one)
  * Validation: no_agent=True requires a script at create time
  * Field roundtrips via load_jobs / save_jobs / update_job

cron/scheduler.py
  * run_job: new short-circuit branch at the top that runs the script,
    wraps its output into the (success, doc, final_response, error)
    tuple downstream delivery already expects, and returns before any
    AIAgent import or construction
  * _run_job_script: picks interpreter by extension — .sh/.bash run
    under /bin/bash, anything else under sys.executable (Python).
    Shell support unlocks the bash-watchdog pattern without wrapping
    scripts in Python. Extension is explicit; we deliberately do NOT
    trust the file's own shebang. Path-containment guard (scripts dir)
    unchanged.

tools/cronjob_tools.py
  * Schema: new no_agent boolean property with clear trigger guidance
  * cronjob() accepts no_agent and validates mode-specific shape:
    - no_agent=True requires script; prompt/skills optional
    - no_agent=False keeps the existing 'prompt or skill required' rule
  * update path rejects flipping no_agent=True on a job without a script
  * _format_job surfaces no_agent in list output
  * Handler lambda forwards no_agent from tool args

hermes_cli/main.py, hermes_cli/cron.py
  * 'hermes cron create --no-agent' and edit's --no-agent / --agent
    pair for toggling at CLI parity with the agent tool
  * Existing --script help text updated to describe both modes
  * List / create / edit output now shows 'Mode: no-agent (...)' when set

## Tests

tests/cron/test_cron_no_agent.py — 18 tests covering:
  * create_job: no_agent shape, validation, field persistence
  * update_job: flag roundtrip across reload
  * cronjob tool: schema validation, update toggling, mode-specific
    requirements, prompt-relaxation rule
  * run_job short-circuit:
    - success path delivers stdout verbatim
    - empty stdout → SILENT_MARKER (no delivery downstream)
    - wakeAgent=false gate → silent
    - script failure → error alert
    - run_job does NOT import AIAgent (verified via mock)
  * _run_job_script:
    - .sh executes via bash (no shebang required)
    - .bash executes via bash
    - .py still runs via sys.executable (regression)
    - path-traversal still blocked (security regression)

All 18 new tests pass. 341/342 pre-existing cron tests still pass; the
one failure (test_script_empty_output_noted) was already broken on main
and is unrelated to this change.

## Docs

website/docs/guides/cron-script-only.md — new dedicated guide covering
the watchdog pattern, interpreter rules, delivery mapping, worked
examples (memory / disk alerts), and the comparison table vs hermes send,
regular LLM cron jobs, and OS-level cron.

website/docs/user-guide/features/cron.md — new 'No-agent mode' section
in the cron feature reference, cross-linked to the guide.

website/docs/guides/automate-with-cron.md — new tip box pointing users
to no-agent mode when they don't need LLM reasoning.

## Compatibility

- Existing jobs: unchanged. no_agent defaults to False, existing code
  paths untouched until the flag is set.
- Schema additive only; older jobs.json without the field load fine
  via .get() with False default.
- New CLI flags are opt-in and don't alter existing flag behavior.

* fix(cron): lazy-import AIAgent + SessionDB so no_agent ticks pay zero

The unconditional `from run_agent import AIAgent` + SessionDB() init at
the top of run_job() meant every no_agent tick still paid the full agent
module load cost (~300ms + transitive imports + DB open) even though it
never touched any of that machinery.

Move both to live under the default (LLM) path, after the no_agent
short-circuit has returned. Now a no_agent tick's sys.modules stays
clean — verified end-to-end:

    assert 'run_agent' not in sys.modules  # before
    run_job(no_agent_job)
    assert 'run_agent' not in sys.modules  # after

The existing mock-based unit test (test_run_job_no_agent_never_invokes_aiagent)
kept passing because patch() replaces the class AFTER import; the leak
was only visible via real subprocess-style verification. End-to-end
demo confirmed: agent calls cronjob(no_agent=True) → script runs →
stdout delivered → no LLM machinery loaded.

* docs(cron): tighten no_agent tool schema — defaults, silent semantics, pick rule

Previous description buried the important bits in one long sentence.
Agents could plausibly miss three things an LLM-facing schema should
make unmissable:

1. What the default is — now first sentence + JSON Schema `default: false`
2. What 'silent run' actually means for the user — now spelled out:
   'nothing is sent to the user and they won't see anything happened'
3. When to pick True vs False — now a concrete decision rule with
   examples on both sides (watchdogs/metrics/pollers → True;
   summarize/draft/pick/rephrase → False)

Also adds explicit 'prompt and skills are ignored when True' since the
agent could otherwise still pass them out of habit.

No behavior change — schema text only.
2026-05-04 12:31:01 -07:00

1044 lines
38 KiB
Python

"""
Cron job storage and management.
Jobs are stored in ~/.hermes/cron/jobs.json
Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md
"""
import copy
import json
import logging
import tempfile
import threading
import os
import re
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from hermes_constants import get_hermes_home
from typing import Optional, Dict, List, Any, Union
logger = logging.getLogger(__name__)
from hermes_time import now as _hermes_now
from utils import atomic_replace
try:
from croniter import croniter
HAS_CRONITER = True
except ImportError:
HAS_CRONITER = False
# =============================================================================
# Configuration
# =============================================================================
HERMES_DIR = get_hermes_home().resolve()
CRON_DIR = HERMES_DIR / "cron"
JOBS_FILE = CRON_DIR / "jobs.json"
# In-process lock protecting load_jobs→modify→save_jobs cycles.
# Required when tick() runs jobs in parallel threads — without this,
# concurrent mark_job_run / advance_next_run calls can clobber each other.
_jobs_file_lock = threading.Lock()
OUTPUT_DIR = CRON_DIR / "output"
ONESHOT_GRACE_SECONDS = 120
def _normalize_skill_list(skill: Optional[str] = None, skills: Optional[Any] = None) -> List[str]:
"""Normalize legacy/single-skill and multi-skill inputs into a unique ordered list."""
if skills is None:
raw_items = [skill] if skill else []
elif isinstance(skills, str):
raw_items = [skills]
else:
raw_items = list(skills)
normalized: List[str] = []
for item in raw_items:
text = str(item or "").strip()
if text and text not in normalized:
normalized.append(text)
return normalized
def _apply_skill_fields(job: Dict[str, Any]) -> Dict[str, Any]:
"""Return a job dict with canonical `skills` and legacy `skill` fields aligned."""
normalized = dict(job)
skills = _normalize_skill_list(normalized.get("skill"), normalized.get("skills"))
normalized["skills"] = skills
normalized["skill"] = skills[0] if skills else None
return normalized
def _secure_dir(path: Path):
"""Set directory to owner-only access (0700). No-op on Windows."""
try:
os.chmod(path, 0o700)
except (OSError, NotImplementedError):
pass # Windows or other platforms where chmod is not supported
def _secure_file(path: Path):
"""Set file to owner-only read/write (0600). No-op on Windows."""
try:
if path.exists():
os.chmod(path, 0o600)
except (OSError, NotImplementedError):
pass
def ensure_dirs():
"""Ensure cron directories exist with secure permissions."""
CRON_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
_secure_dir(CRON_DIR)
_secure_dir(OUTPUT_DIR)
# =============================================================================
# Schedule Parsing
# =============================================================================
def parse_duration(s: str) -> int:
"""
Parse duration string into minutes.
Examples:
"30m" → 30
"2h" → 120
"1d" → 1440
"""
s = s.strip().lower()
match = re.match(r'^(\d+)\s*(m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)$', s)
if not match:
raise ValueError(f"Invalid duration: '{s}'. Use format like '30m', '2h', or '1d'")
value = int(match.group(1))
unit = match.group(2)[0] # First char: m, h, or d
multipliers = {'m': 1, 'h': 60, 'd': 1440}
return value * multipliers[unit]
def parse_schedule(schedule: str) -> Dict[str, Any]:
"""
Parse schedule string into structured format.
Returns dict with:
- kind: "once" | "interval" | "cron"
- For "once": "run_at" (ISO timestamp)
- For "interval": "minutes" (int)
- For "cron": "expr" (cron expression)
Examples:
"30m" → once in 30 minutes
"2h" → once in 2 hours
"every 30m" → recurring every 30 minutes
"every 2h" → recurring every 2 hours
"0 9 * * *" → cron expression
"2026-02-03T14:00" → once at timestamp
"""
schedule = schedule.strip()
original = schedule
schedule_lower = schedule.lower()
# "every X" pattern → recurring interval
if schedule_lower.startswith("every "):
duration_str = schedule[6:].strip()
minutes = parse_duration(duration_str)
return {
"kind": "interval",
"minutes": minutes,
"display": f"every {minutes}m"
}
# Check for cron expression (5 or 6 space-separated fields)
# Cron fields: minute hour day month weekday [year]
parts = schedule.split()
if len(parts) >= 5 and all(
re.match(r'^[\d\*\-,/]+$', p) for p in parts[:5]
):
if not HAS_CRONITER:
raise ValueError("Cron expressions require 'croniter' package. Install with: pip install croniter")
# Validate cron expression
try:
croniter(schedule)
except Exception as e:
raise ValueError(f"Invalid cron expression '{schedule}': {e}")
return {
"kind": "cron",
"expr": schedule,
"display": schedule
}
# ISO timestamp (contains T or looks like date)
if 'T' in schedule or re.match(r'^\d{4}-\d{2}-\d{2}', schedule):
try:
# Parse and validate
dt = datetime.fromisoformat(schedule.replace('Z', '+00:00'))
# Make naive timestamps timezone-aware at parse time so the stored
# value doesn't depend on the system timezone matching at check time.
if dt.tzinfo is None:
dt = dt.astimezone() # Interpret as local timezone
return {
"kind": "once",
"run_at": dt.isoformat(),
"display": f"once at {dt.strftime('%Y-%m-%d %H:%M')}"
}
except ValueError as e:
raise ValueError(f"Invalid timestamp '{schedule}': {e}")
# Duration like "30m", "2h", "1d" → one-shot from now
try:
minutes = parse_duration(schedule)
run_at = _hermes_now() + timedelta(minutes=minutes)
return {
"kind": "once",
"run_at": run_at.isoformat(),
"display": f"once in {original}"
}
except ValueError:
pass
raise ValueError(
f"Invalid schedule '{original}'. Use:\n"
f" - Duration: '30m', '2h', '1d' (one-shot)\n"
f" - Interval: 'every 30m', 'every 2h' (recurring)\n"
f" - Cron: '0 9 * * *' (cron expression)\n"
f" - Timestamp: '2026-02-03T14:00:00' (one-shot at time)"
)
def _ensure_aware(dt: datetime) -> datetime:
"""Return a timezone-aware datetime in Hermes configured timezone.
Backward compatibility:
- Older stored timestamps may be naive.
- Naive values are interpreted as *system-local wall time* (the timezone
`datetime.now()` used when they were created), then converted to the
configured Hermes timezone.
This preserves relative ordering for legacy naive timestamps across
timezone changes and avoids false not-due results.
"""
target_tz = _hermes_now().tzinfo
if dt.tzinfo is None:
local_tz = datetime.now().astimezone().tzinfo
return dt.replace(tzinfo=local_tz).astimezone(target_tz)
return dt.astimezone(target_tz)
def _recoverable_oneshot_run_at(
schedule: Dict[str, Any],
now: datetime,
*,
last_run_at: Optional[str] = None,
) -> Optional[str]:
"""Return a one-shot run time if it is still eligible to fire.
One-shot jobs get a small grace window so jobs created a few seconds after
their requested minute still run on the next tick. Once a one-shot has
already run, it is never eligible again.
"""
if schedule.get("kind") != "once":
return None
if last_run_at:
return None
run_at = schedule.get("run_at")
if not run_at:
return None
run_at_dt = _ensure_aware(datetime.fromisoformat(run_at))
if run_at_dt >= now - timedelta(seconds=ONESHOT_GRACE_SECONDS):
return run_at
return None
def _compute_grace_seconds(schedule: dict) -> int:
"""Compute how late a job can be and still catch up instead of fast-forwarding.
Uses half the schedule period, clamped between 120 seconds and 2 hours.
This ensures daily jobs can catch up if missed by up to 2 hours,
while frequent jobs (every 5-10 min) still fast-forward quickly.
"""
MIN_GRACE = 120
MAX_GRACE = 7200 # 2 hours
kind = schedule.get("kind")
if kind == "interval":
period_seconds = schedule.get("minutes", 1) * 60
grace = period_seconds // 2
return max(MIN_GRACE, min(grace, MAX_GRACE))
if kind == "cron" and HAS_CRONITER:
try:
now = _hermes_now()
cron = croniter(schedule["expr"], now)
first = cron.get_next(datetime)
second = cron.get_next(datetime)
period_seconds = int((second - first).total_seconds())
grace = period_seconds // 2
return max(MIN_GRACE, min(grace, MAX_GRACE))
except Exception:
pass
return MIN_GRACE
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
"""
Compute the next run time for a schedule.
Returns ISO timestamp string, or None if no more runs.
"""
now = _hermes_now()
if schedule["kind"] == "once":
return _recoverable_oneshot_run_at(schedule, now, last_run_at=last_run_at)
elif schedule["kind"] == "interval":
minutes = schedule["minutes"]
if last_run_at:
# Next run is last_run + interval
last = _ensure_aware(datetime.fromisoformat(last_run_at))
next_run = last + timedelta(minutes=minutes)
else:
# First run is now + interval
next_run = now + timedelta(minutes=minutes)
return next_run.isoformat()
elif schedule["kind"] == "cron":
if not HAS_CRONITER:
logger.warning(
"Cannot compute next run for cron schedule %r: 'croniter' is "
"not installed. croniter is a core dependency as of v0.9.x; "
"reinstall hermes-agent or run 'pip install croniter' in your "
"runtime env.",
schedule.get("expr"),
)
return None
# Use last_run_at as the croniter base when available, consistent
# with interval jobs. This ensures that after a crash/restart,
# the next run is anchored to the actual last execution time
# rather than to an arbitrary restart time.
base_time = now
if last_run_at:
base_time = _ensure_aware(datetime.fromisoformat(last_run_at))
cron = croniter(schedule["expr"], base_time)
next_run = cron.get_next(datetime)
return next_run.isoformat()
return None
# =============================================================================
# Job CRUD Operations
# =============================================================================
def load_jobs() -> List[Dict[str, Any]]:
"""Load all jobs from storage."""
ensure_dirs()
if not JOBS_FILE.exists():
return []
try:
with open(JOBS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("jobs", [])
except json.JSONDecodeError:
# Retry with strict=False to handle bare control chars in string values
try:
with open(JOBS_FILE, 'r', encoding='utf-8') as f:
data = json.loads(f.read(), strict=False)
jobs = data.get("jobs", [])
if jobs:
# Auto-repair: rewrite with proper escaping
save_jobs(jobs)
logger.warning("Auto-repaired jobs.json (had invalid control characters)")
return jobs
except Exception as e:
logger.error("Failed to auto-repair jobs.json: %s", e)
raise RuntimeError(f"Cron database corrupted and unrepairable: {e}") from e
except IOError as e:
logger.error("IOError reading jobs.json: %s", e)
raise RuntimeError(f"Failed to read cron database: {e}") from e
def save_jobs(jobs: List[Dict[str, Any]]):
"""Save all jobs to storage."""
ensure_dirs()
fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_')
try:
with os.fdopen(fd, 'w', encoding='utf-8') as f:
json.dump({"jobs": jobs, "updated_at": _hermes_now().isoformat()}, f, indent=2)
f.flush()
os.fsync(f.fileno())
atomic_replace(tmp_path, JOBS_FILE)
_secure_file(JOBS_FILE)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
def _normalize_workdir(workdir: Optional[str]) -> Optional[str]:
"""Normalize and validate a cron job workdir.
Rules:
- Empty / None → None (feature off, preserves old behaviour).
- ``~`` is expanded. Relative paths are rejected — cron jobs run detached
from any shell cwd, so relative paths have no stable meaning.
- The path must exist and be a directory at create/update time. We do
NOT re-check at run time (a user might briefly unmount the dir; the
scheduler will just fall back to old behaviour with a logged warning).
Returns the absolute path string, or None when disabled.
Raises ValueError on invalid input.
"""
if workdir is None:
return None
raw = str(workdir).strip()
if not raw:
return None
expanded = Path(raw).expanduser()
if not expanded.is_absolute():
raise ValueError(
f"Cron workdir must be an absolute path (got {raw!r}). "
f"Cron jobs run detached from any shell cwd, so relative paths are ambiguous."
)
resolved = expanded.resolve()
if not resolved.exists():
raise ValueError(f"Cron workdir does not exist: {resolved}")
if not resolved.is_dir():
raise ValueError(f"Cron workdir is not a directory: {resolved}")
return str(resolved)
def create_job(
prompt: Optional[str],
schedule: str,
name: Optional[str] = None,
repeat: Optional[int] = None,
deliver: Optional[str] = None,
origin: Optional[Dict[str, Any]] = None,
skill: Optional[str] = None,
skills: Optional[List[str]] = None,
model: Optional[str] = None,
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
context_from: Optional[Union[str, List[str]]] = None,
enabled_toolsets: Optional[List[str]] = None,
workdir: Optional[str] = None,
no_agent: bool = False,
) -> Dict[str, Any]:
"""
Create a new cron job.
Args:
prompt: The prompt to run (must be self-contained, or a task instruction when skill is set).
Ignored when ``no_agent=True`` except as an optional name hint.
schedule: Schedule string (see parse_schedule)
name: Optional friendly name
repeat: How many times to run (None = forever, 1 = once)
deliver: Where to deliver output ("origin", "local", "telegram", etc.)
origin: Source info where job was created (for "origin" delivery)
skill: Optional legacy single skill name to load before running the prompt
skills: Optional ordered list of skills to load before running the prompt
model: Optional per-job model override
provider: Optional per-job provider override
base_url: Optional per-job base URL override
script: Optional path to a script whose stdout feeds the job. With
``no_agent=True`` the script IS the job — its stdout is
delivered verbatim. Without ``no_agent``, its stdout is
injected into the agent's prompt as context (data-collection /
change-detection pattern). Paths resolve under
~/.hermes/scripts/; ``.sh`` / ``.bash`` files run via bash,
anything else via Python.
context_from: Optional job ID (or list of job IDs) whose most recent output
is injected into the prompt as context before each run.
Useful for chaining cron jobs: job A finds data, job B processes it.
enabled_toolsets: Optional list of toolset names to restrict the agent to.
When set, only tools from these toolsets are loaded, reducing
token overhead. When omitted, all default tools are loaded.
Ignored when ``no_agent=True``.
workdir: Optional absolute path. When set, the job runs as if launched
from that directory: AGENTS.md / CLAUDE.md / .cursorrules from
that directory are injected into the system prompt, and the
terminal/file/code_exec tools use it as their working directory
(via TERMINAL_CWD). When unset, the old behaviour is preserved
(no context files injected, tools use the scheduler's cwd).
With ``no_agent=True``, ``workdir`` is still applied as the
script's cwd so relative paths inside the script behave
predictably.
no_agent: When True, skip the agent entirely — run ``script`` on schedule
and deliver its stdout directly. Empty stdout = silent (no
delivery). Requires ``script`` to be set. Ideal for classic
watchdogs and periodic alerts that don't need LLM reasoning.
Returns:
The created job dict
"""
parsed_schedule = parse_schedule(schedule)
# Normalize repeat: treat 0 or negative values as None (infinite)
if repeat is not None and repeat <= 0:
repeat = None
# Auto-set repeat=1 for one-shot schedules if not specified
if parsed_schedule["kind"] == "once" and repeat is None:
repeat = 1
# Default delivery to origin if available, otherwise local
if deliver is None:
deliver = "origin" if origin else "local"
job_id = uuid.uuid4().hex[:12]
now = _hermes_now().isoformat()
normalized_skills = _normalize_skill_list(skill, skills)
normalized_model = str(model).strip() if isinstance(model, str) else None
normalized_provider = str(provider).strip() if isinstance(provider, str) else None
normalized_base_url = str(base_url).strip().rstrip("/") if isinstance(base_url, str) else None
normalized_model = normalized_model or None
normalized_provider = normalized_provider or None
normalized_base_url = normalized_base_url or None
normalized_script = str(script).strip() if isinstance(script, str) else None
normalized_script = normalized_script or None
normalized_toolsets = [str(t).strip() for t in enabled_toolsets if str(t).strip()] if enabled_toolsets else None
normalized_toolsets = normalized_toolsets or None
normalized_workdir = _normalize_workdir(workdir)
normalized_no_agent = bool(no_agent)
# no_agent jobs are meaningless without a script — the script IS the job.
# Surface this as a clear ValueError at create time so bad configs never
# reach the scheduler.
if normalized_no_agent and not normalized_script:
raise ValueError(
"no_agent=True requires a script — with no agent and no script "
"there is nothing for the job to run."
)
# Normalize context_from: accept str or list of str, store as list or None
if isinstance(context_from, str):
context_from = [context_from.strip()] if context_from.strip() else None
elif isinstance(context_from, list):
context_from = [str(j).strip() for j in context_from if str(j).strip()] or None
else:
context_from = None
label_source = (prompt or (normalized_skills[0] if normalized_skills else None) or (normalized_script if normalized_no_agent else None)) or "cron job"
job = {
"id": job_id,
"name": name or label_source[:50].strip(),
"prompt": prompt,
"skills": normalized_skills,
"skill": normalized_skills[0] if normalized_skills else None,
"model": normalized_model,
"provider": normalized_provider,
"base_url": normalized_base_url,
"script": normalized_script,
"no_agent": normalized_no_agent,
"context_from": context_from,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule),
"repeat": {
"times": repeat, # None = forever
"completed": 0
},
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": now,
"next_run_at": compute_next_run(parsed_schedule),
"last_run_at": None,
"last_status": None,
"last_error": None,
"last_delivery_error": None,
# Delivery configuration
"deliver": deliver,
"origin": origin, # Tracks where job was created for "origin" delivery
"enabled_toolsets": normalized_toolsets,
"workdir": normalized_workdir,
}
jobs = load_jobs()
jobs.append(job)
save_jobs(jobs)
return job
def get_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Get a job by ID."""
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
return _apply_skill_fields(job)
return None
def list_jobs(include_disabled: bool = False) -> List[Dict[str, Any]]:
"""List all jobs, optionally including disabled ones."""
jobs = [_apply_skill_fields(j) for j in load_jobs()]
if not include_disabled:
jobs = [j for j in jobs if j.get("enabled", True)]
return jobs
def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update a job by ID, refreshing derived schedule fields when needed."""
jobs = load_jobs()
for i, job in enumerate(jobs):
if job["id"] != job_id:
continue
# Validate / normalize workdir if present in updates. Empty string or
# None both mean "clear the field" (restore old behaviour).
if "workdir" in updates:
_wd = updates["workdir"]
if _wd in (None, "", False):
updates["workdir"] = None
else:
updates["workdir"] = _normalize_workdir(_wd)
updated = _apply_skill_fields({**job, **updates})
schedule_changed = "schedule" in updates
if "skills" in updates or "skill" in updates:
normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills"))
updated["skills"] = normalized_skills
updated["skill"] = normalized_skills[0] if normalized_skills else None
if schedule_changed:
updated_schedule = updated["schedule"]
# The API may pass schedule as a raw string (e.g. "every 10m")
# instead of a pre-parsed dict. Normalize it the same way
# create_job() does so downstream code can call .get() safely.
if isinstance(updated_schedule, str):
updated_schedule = parse_schedule(updated_schedule)
updated["schedule"] = updated_schedule
updated["schedule_display"] = updates.get(
"schedule_display",
updated_schedule.get("display", updated.get("schedule_display")),
)
if updated.get("state") != "paused":
updated["next_run_at"] = compute_next_run(updated_schedule)
if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"):
updated["next_run_at"] = compute_next_run(updated["schedule"])
jobs[i] = updated
save_jobs(jobs)
return _apply_skill_fields(jobs[i])
return None
def pause_job(job_id: str, reason: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Pause a job without deleting it."""
return update_job(
job_id,
{
"enabled": False,
"state": "paused",
"paused_at": _hermes_now().isoformat(),
"paused_reason": reason,
},
)
def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Resume a paused job and compute the next future run from now."""
job = get_job(job_id)
if not job:
return None
next_run_at = compute_next_run(job["schedule"])
return update_job(
job_id,
{
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": next_run_at,
},
)
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Schedule a job to run on the next scheduler tick."""
job = get_job(job_id)
if not job:
return None
return update_job(
job_id,
{
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
},
)
def remove_job(job_id: str) -> bool:
"""Remove a job by ID."""
jobs = load_jobs()
original_len = len(jobs)
jobs = [j for j in jobs if j["id"] != job_id]
if len(jobs) < original_len:
save_jobs(jobs)
return True
return False
def mark_job_run(job_id: str, success: bool, error: Optional[str] = None,
delivery_error: Optional[str] = None):
"""
Mark a job as having been run.
Updates last_run_at, last_status, increments completed count,
computes next_run_at, and auto-deletes if repeat limit reached.
``delivery_error`` is tracked separately from the agent error — a job
can succeed (agent produced output) but fail delivery (platform down).
"""
with _jobs_file_lock:
jobs = load_jobs()
for i, job in enumerate(jobs):
if job["id"] == job_id:
now = _hermes_now().isoformat()
job["last_run_at"] = now
job["last_status"] = "ok" if success else "error"
job["last_error"] = error if not success else None
# Track delivery failures separately — cleared on successful delivery
job["last_delivery_error"] = delivery_error
# Increment completed count
if job.get("repeat"):
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
# Check if we've hit the repeat limit
times = job["repeat"].get("times")
completed = job["repeat"]["completed"]
if times is not None and times > 0 and completed >= times:
# Remove the job (limit reached)
jobs.pop(i)
save_jobs(jobs)
return
# Compute next run
job["next_run_at"] = compute_next_run(job["schedule"], now)
# If no next run, decide whether this is terminal completion
# (one-shot) or a transient failure (recurring schedule couldn't
# compute — e.g. 'croniter' missing from the runtime env).
# Recurring jobs must NEVER be silently disabled: that turns a
# missing runtime dep into "job completed" and the user's
# schedule quietly goes off. See issue #16265.
if job["next_run_at"] is None:
kind = job.get("schedule", {}).get("kind")
if kind in ("cron", "interval"):
job["state"] = "error"
if not job.get("last_error"):
job["last_error"] = (
"Failed to compute next run for recurring "
"schedule (is the 'croniter' package "
"installed in the gateway's Python env?)"
)
logger.error(
"Job '%s' (%s) could not compute next_run_at; "
"leaving enabled and marking state=error so the "
"job is not silently disabled.",
job.get("name", job["id"]),
kind,
)
else:
job["enabled"] = False
job["state"] = "completed"
elif job.get("state") != "paused":
job["state"] = "scheduled"
save_jobs(jobs)
return
logger.warning("mark_job_run: job_id %s not found, skipping save", job_id)
def advance_next_run(job_id: str) -> bool:
"""Preemptively advance next_run_at for a recurring job before execution.
Call this BEFORE run_job() so that if the process crashes mid-execution,
the job won't re-fire on the next gateway restart. This converts the
scheduler from at-least-once to at-most-once for recurring jobs — missing
one run is far better than firing dozens of times in a crash loop.
One-shot jobs are left unchanged so they can still retry on restart.
Returns True if next_run_at was advanced, False otherwise.
"""
with _jobs_file_lock:
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
kind = job.get("schedule", {}).get("kind")
if kind not in ("cron", "interval"):
return False
now = _hermes_now().isoformat()
new_next = compute_next_run(job["schedule"], now)
if new_next and new_next != job.get("next_run_at"):
job["next_run_at"] = new_next
save_jobs(jobs)
return True
return False
return False
def get_due_jobs() -> List[Dict[str, Any]]:
"""Get all jobs that are due to run now.
For recurring jobs (cron/interval), if the scheduled time is stale
(more than one period in the past, e.g. because the gateway was down),
the job is fast-forwarded to the next future run instead of firing
immediately. This prevents a burst of missed jobs on gateway restart.
"""
now = _hermes_now()
raw_jobs = load_jobs()
jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)]
due = []
needs_save = False
for job in jobs:
if not job.get("enabled", True):
continue
next_run = job.get("next_run_at")
if not next_run:
schedule = job.get("schedule", {})
kind = schedule.get("kind")
# One-shot jobs use a small grace window via the dedicated helper.
recovered_next = _recoverable_oneshot_run_at(
schedule,
now,
last_run_at=job.get("last_run_at"),
)
recovery_kind = "one-shot" if recovered_next else None
# Recurring jobs reach here only when something — typically a
# direct jobs.json edit that bypassed add_job() — left
# next_run_at unset. Without this branch, such jobs are
# silently skipped forever; recompute next_run_at from the
# schedule so they pick up at their next scheduled tick.
if not recovered_next and kind in ("cron", "interval"):
recovered_next = compute_next_run(schedule, now.isoformat())
if recovered_next:
recovery_kind = kind
if not recovered_next:
continue
job["next_run_at"] = recovered_next
next_run = recovered_next
logger.info(
"Job '%s' had no next_run_at; recovering %s run at %s",
job.get("name", job["id"]),
recovery_kind,
recovered_next,
)
for rj in raw_jobs:
if rj["id"] == job["id"]:
rj["next_run_at"] = recovered_next
needs_save = True
break
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
if next_run_dt <= now:
schedule = job.get("schedule", {})
kind = schedule.get("kind")
# For recurring jobs, check if the scheduled time is stale
# (gateway was down and missed the window). Fast-forward to
# the next future occurrence instead of firing a stale run.
grace = _compute_grace_seconds(schedule)
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > grace:
# Job is past its catch-up grace window — this is a stale missed run.
# Grace scales with schedule period: daily=2h, hourly=30m, 10min=5m.
new_next = compute_next_run(schedule, now.isoformat())
if new_next:
logger.info(
"Job '%s' missed its scheduled time (%s, grace=%ds). "
"Fast-forwarding to next run: %s",
job.get("name", job["id"]),
next_run,
grace,
new_next,
)
# Update the job in storage
for rj in raw_jobs:
if rj["id"] == job["id"]:
rj["next_run_at"] = new_next
needs_save = True
break
continue # Skip this run
due.append(job)
if needs_save:
save_jobs(raw_jobs)
return due
def save_job_output(job_id: str, output: str):
"""Save job output to file."""
ensure_dirs()
job_output_dir = OUTPUT_DIR / job_id
job_output_dir.mkdir(parents=True, exist_ok=True)
_secure_dir(job_output_dir)
timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S")
output_file = job_output_dir / f"{timestamp}.md"
fd, tmp_path = tempfile.mkstemp(dir=str(job_output_dir), suffix='.tmp', prefix='.output_')
try:
with os.fdopen(fd, 'w', encoding='utf-8') as f:
f.write(output)
f.flush()
os.fsync(f.fileno())
atomic_replace(tmp_path, output_file)
_secure_file(output_file)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
return output_file
# =============================================================================
# Skill reference rewriting (curator integration)
# =============================================================================
def rewrite_skill_refs(
consolidated: Optional[Dict[str, str]] = None,
pruned: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Rewrite cron job skill references after a curator consolidation pass.
When the curator consolidates a skill X into umbrella Y (or archives X
as pruned), any cron job that lists ``X`` in its ``skills`` field will
fail to load ``X`` at run time — the scheduler logs a warning and
skips the skill, so the job runs without the instructions it was
scheduled to follow. See cron/scheduler.py where ``skill_view`` is
called per skill name.
This function repairs cron jobs in-place:
- A skill listed in ``consolidated`` is replaced with its umbrella
target (the ``into`` value). If the umbrella is already in the
job's skill list, the stale name is dropped without duplication.
- A skill listed in ``pruned`` is dropped outright — there is no
forwarding target.
- Ordering and other skills in the list are preserved.
- The legacy ``skill`` field is realigned via ``_apply_skill_fields``.
Args:
consolidated: mapping of ``old_skill_name -> umbrella_skill_name``.
pruned: list of skill names that were archived with no forwarding
target.
Returns a report dict::
{
"rewrites": [
{
"job_id": ...,
"job_name": ...,
"before": [...],
"after": [...],
"mapped": {"old": "new", ...},
"dropped": ["old", ...],
},
...
],
"jobs_updated": N,
"jobs_scanned": M,
}
Best-effort: exceptions from loading/saving propagate to the caller so
tests can assert behaviour; the curator invocation site wraps this
call in a try/except so a failure here never breaks the curator.
"""
consolidated = dict(consolidated or {})
pruned_set = set(pruned or [])
# A skill listed in both wins as "consolidated" — it has a target,
# which is the more useful of the two outcomes.
pruned_set -= set(consolidated.keys())
if not consolidated and not pruned_set:
return {"rewrites": [], "jobs_updated": 0, "jobs_scanned": 0}
with _jobs_file_lock:
jobs = load_jobs()
rewrites: List[Dict[str, Any]] = []
changed = False
for job in jobs:
skills_before = _normalize_skill_list(job.get("skill"), job.get("skills"))
if not skills_before:
continue
mapped: Dict[str, str] = {}
dropped: List[str] = []
new_skills: List[str] = []
for name in skills_before:
if name in consolidated:
target = consolidated[name]
mapped[name] = target
if target and target not in new_skills:
new_skills.append(target)
elif name in pruned_set:
dropped.append(name)
else:
if name not in new_skills:
new_skills.append(name)
if not mapped and not dropped:
continue
job["skills"] = new_skills
job["skill"] = new_skills[0] if new_skills else None
changed = True
rewrites.append({
"job_id": job.get("id"),
"job_name": job.get("name") or job.get("id"),
"before": list(skills_before),
"after": list(new_skills),
"mapped": mapped,
"dropped": dropped,
})
if changed:
save_jobs(jobs)
logger.info(
"Curator rewrote skill references in %d cron job(s)", len(rewrites)
)
return {
"rewrites": rewrites,
"jobs_updated": len(rewrites),
"jobs_scanned": len(jobs),
}