""" Cron job storage and management. Jobs are stored in ~/.hermes/cron/jobs.json Output is saved to ~/.hermes/cron/output/{job_id}/{timestamp}.md """ import copy import json import logging import tempfile import threading import os import re import uuid from datetime import datetime, timedelta from pathlib import Path from hermes_constants import get_hermes_home from typing import Optional, Dict, List, Any, Union logger = logging.getLogger(__name__) from hermes_time import now as _hermes_now from utils import atomic_replace try: from croniter import croniter HAS_CRONITER = True except ImportError: HAS_CRONITER = False # ============================================================================= # Configuration # ============================================================================= HERMES_DIR = get_hermes_home().resolve() CRON_DIR = HERMES_DIR / "cron" JOBS_FILE = CRON_DIR / "jobs.json" # In-process lock protecting load_jobs→modify→save_jobs cycles. # Required when tick() runs jobs in parallel threads — without this, # concurrent mark_job_run / advance_next_run calls can clobber each other. _jobs_file_lock = threading.Lock() OUTPUT_DIR = CRON_DIR / "output" ONESHOT_GRACE_SECONDS = 120 def _normalize_skill_list(skill: Optional[str] = None, skills: Optional[Any] = None) -> List[str]: """Normalize legacy/single-skill and multi-skill inputs into a unique ordered list.""" if skills is None: raw_items = [skill] if skill else [] elif isinstance(skills, str): raw_items = [skills] else: raw_items = list(skills) normalized: List[str] = [] for item in raw_items: text = str(item or "").strip() if text and text not in normalized: normalized.append(text) return normalized def _apply_skill_fields(job: Dict[str, Any]) -> Dict[str, Any]: """Return a job dict with canonical `skills` and legacy `skill` fields aligned.""" normalized = dict(job) skills = _normalize_skill_list(normalized.get("skill"), normalized.get("skills")) normalized["skills"] = skills normalized["skill"] = skills[0] if skills else None return normalized def _secure_dir(path: Path): """Set directory to owner-only access (0700). No-op on Windows.""" try: os.chmod(path, 0o700) except (OSError, NotImplementedError): pass # Windows or other platforms where chmod is not supported def _secure_file(path: Path): """Set file to owner-only read/write (0600). No-op on Windows.""" try: if path.exists(): os.chmod(path, 0o600) except (OSError, NotImplementedError): pass def ensure_dirs(): """Ensure cron directories exist with secure permissions.""" CRON_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) _secure_dir(CRON_DIR) _secure_dir(OUTPUT_DIR) # ============================================================================= # Schedule Parsing # ============================================================================= def parse_duration(s: str) -> int: """ Parse duration string into minutes. Examples: "30m" → 30 "2h" → 120 "1d" → 1440 """ s = s.strip().lower() match = re.match(r'^(\d+)\s*(m|min|mins|minute|minutes|h|hr|hrs|hour|hours|d|day|days)$', s) if not match: raise ValueError(f"Invalid duration: '{s}'. Use format like '30m', '2h', or '1d'") value = int(match.group(1)) unit = match.group(2)[0] # First char: m, h, or d multipliers = {'m': 1, 'h': 60, 'd': 1440} return value * multipliers[unit] def parse_schedule(schedule: str) -> Dict[str, Any]: """ Parse schedule string into structured format. Returns dict with: - kind: "once" | "interval" | "cron" - For "once": "run_at" (ISO timestamp) - For "interval": "minutes" (int) - For "cron": "expr" (cron expression) Examples: "30m" → once in 30 minutes "2h" → once in 2 hours "every 30m" → recurring every 30 minutes "every 2h" → recurring every 2 hours "0 9 * * *" → cron expression "2026-02-03T14:00" → once at timestamp """ schedule = schedule.strip() original = schedule schedule_lower = schedule.lower() # "every X" pattern → recurring interval if schedule_lower.startswith("every "): duration_str = schedule[6:].strip() minutes = parse_duration(duration_str) return { "kind": "interval", "minutes": minutes, "display": f"every {minutes}m" } # Check for cron expression (5 or 6 space-separated fields) # Cron fields: minute hour day month weekday [year] parts = schedule.split() if len(parts) >= 5 and all( re.match(r'^[\d\*\-,/]+$', p) for p in parts[:5] ): if not HAS_CRONITER: raise ValueError("Cron expressions require 'croniter' package. Install with: pip install croniter") # Validate cron expression try: croniter(schedule) except Exception as e: raise ValueError(f"Invalid cron expression '{schedule}': {e}") return { "kind": "cron", "expr": schedule, "display": schedule } # ISO timestamp (contains T or looks like date) if 'T' in schedule or re.match(r'^\d{4}-\d{2}-\d{2}', schedule): try: # Parse and validate dt = datetime.fromisoformat(schedule.replace('Z', '+00:00')) # Make naive timestamps timezone-aware at parse time so the stored # value doesn't depend on the system timezone matching at check time. if dt.tzinfo is None: dt = dt.astimezone() # Interpret as local timezone return { "kind": "once", "run_at": dt.isoformat(), "display": f"once at {dt.strftime('%Y-%m-%d %H:%M')}" } except ValueError as e: raise ValueError(f"Invalid timestamp '{schedule}': {e}") # Duration like "30m", "2h", "1d" → one-shot from now try: minutes = parse_duration(schedule) run_at = _hermes_now() + timedelta(minutes=minutes) return { "kind": "once", "run_at": run_at.isoformat(), "display": f"once in {original}" } except ValueError: pass raise ValueError( f"Invalid schedule '{original}'. Use:\n" f" - Duration: '30m', '2h', '1d' (one-shot)\n" f" - Interval: 'every 30m', 'every 2h' (recurring)\n" f" - Cron: '0 9 * * *' (cron expression)\n" f" - Timestamp: '2026-02-03T14:00:00' (one-shot at time)" ) def _ensure_aware(dt: datetime) -> datetime: """Return a timezone-aware datetime in Hermes configured timezone. Backward compatibility: - Older stored timestamps may be naive. - Naive values are interpreted as *system-local wall time* (the timezone `datetime.now()` used when they were created), then converted to the configured Hermes timezone. This preserves relative ordering for legacy naive timestamps across timezone changes and avoids false not-due results. """ target_tz = _hermes_now().tzinfo if dt.tzinfo is None: local_tz = datetime.now().astimezone().tzinfo return dt.replace(tzinfo=local_tz).astimezone(target_tz) return dt.astimezone(target_tz) def _recoverable_oneshot_run_at( schedule: Dict[str, Any], now: datetime, *, last_run_at: Optional[str] = None, ) -> Optional[str]: """Return a one-shot run time if it is still eligible to fire. One-shot jobs get a small grace window so jobs created a few seconds after their requested minute still run on the next tick. Once a one-shot has already run, it is never eligible again. """ if schedule.get("kind") != "once": return None if last_run_at: return None run_at = schedule.get("run_at") if not run_at: return None run_at_dt = _ensure_aware(datetime.fromisoformat(run_at)) if run_at_dt >= now - timedelta(seconds=ONESHOT_GRACE_SECONDS): return run_at return None def _compute_grace_seconds(schedule: dict) -> int: """Compute how late a job can be and still catch up instead of fast-forwarding. Uses half the schedule period, clamped between 120 seconds and 2 hours. This ensures daily jobs can catch up if missed by up to 2 hours, while frequent jobs (every 5-10 min) still fast-forward quickly. """ MIN_GRACE = 120 MAX_GRACE = 7200 # 2 hours kind = schedule.get("kind") if kind == "interval": period_seconds = schedule.get("minutes", 1) * 60 grace = period_seconds // 2 return max(MIN_GRACE, min(grace, MAX_GRACE)) if kind == "cron" and HAS_CRONITER: try: now = _hermes_now() cron = croniter(schedule["expr"], now) first = cron.get_next(datetime) second = cron.get_next(datetime) period_seconds = int((second - first).total_seconds()) grace = period_seconds // 2 return max(MIN_GRACE, min(grace, MAX_GRACE)) except Exception: pass return MIN_GRACE def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]: """ Compute the next run time for a schedule. Returns ISO timestamp string, or None if no more runs. """ now = _hermes_now() if schedule["kind"] == "once": return _recoverable_oneshot_run_at(schedule, now, last_run_at=last_run_at) elif schedule["kind"] == "interval": minutes = schedule["minutes"] if last_run_at: # Next run is last_run + interval last = _ensure_aware(datetime.fromisoformat(last_run_at)) next_run = last + timedelta(minutes=minutes) else: # First run is now + interval next_run = now + timedelta(minutes=minutes) return next_run.isoformat() elif schedule["kind"] == "cron": if not HAS_CRONITER: logger.warning( "Cannot compute next run for cron schedule %r: 'croniter' is " "not installed. croniter is a core dependency as of v0.9.x; " "reinstall hermes-agent or run 'pip install croniter' in your " "runtime env.", schedule.get("expr"), ) return None # Use last_run_at as the croniter base when available, consistent # with interval jobs. This ensures that after a crash/restart, # the next run is anchored to the actual last execution time # rather than to an arbitrary restart time. base_time = now if last_run_at: base_time = _ensure_aware(datetime.fromisoformat(last_run_at)) cron = croniter(schedule["expr"], base_time) next_run = cron.get_next(datetime) return next_run.isoformat() return None # ============================================================================= # Job CRUD Operations # ============================================================================= def load_jobs() -> List[Dict[str, Any]]: """Load all jobs from storage.""" ensure_dirs() if not JOBS_FILE.exists(): return [] try: with open(JOBS_FILE, 'r', encoding='utf-8') as f: data = json.load(f) return data.get("jobs", []) except json.JSONDecodeError: # Retry with strict=False to handle bare control chars in string values try: with open(JOBS_FILE, 'r', encoding='utf-8') as f: data = json.loads(f.read(), strict=False) jobs = data.get("jobs", []) if jobs: # Auto-repair: rewrite with proper escaping save_jobs(jobs) logger.warning("Auto-repaired jobs.json (had invalid control characters)") return jobs except Exception as e: logger.error("Failed to auto-repair jobs.json: %s", e) raise RuntimeError(f"Cron database corrupted and unrepairable: {e}") from e except IOError as e: logger.error("IOError reading jobs.json: %s", e) raise RuntimeError(f"Failed to read cron database: {e}") from e def save_jobs(jobs: List[Dict[str, Any]]): """Save all jobs to storage.""" ensure_dirs() fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_') try: with os.fdopen(fd, 'w', encoding='utf-8') as f: json.dump({"jobs": jobs, "updated_at": _hermes_now().isoformat()}, f, indent=2) f.flush() os.fsync(f.fileno()) atomic_replace(tmp_path, JOBS_FILE) _secure_file(JOBS_FILE) except BaseException: try: os.unlink(tmp_path) except OSError: pass raise def _normalize_workdir(workdir: Optional[str]) -> Optional[str]: """Normalize and validate a cron job workdir. Rules: - Empty / None → None (feature off, preserves old behaviour). - ``~`` is expanded. Relative paths are rejected — cron jobs run detached from any shell cwd, so relative paths have no stable meaning. - The path must exist and be a directory at create/update time. We do NOT re-check at run time (a user might briefly unmount the dir; the scheduler will just fall back to old behaviour with a logged warning). Returns the absolute path string, or None when disabled. Raises ValueError on invalid input. """ if workdir is None: return None raw = str(workdir).strip() if not raw: return None expanded = Path(raw).expanduser() if not expanded.is_absolute(): raise ValueError( f"Cron workdir must be an absolute path (got {raw!r}). " f"Cron jobs run detached from any shell cwd, so relative paths are ambiguous." ) resolved = expanded.resolve() if not resolved.exists(): raise ValueError(f"Cron workdir does not exist: {resolved}") if not resolved.is_dir(): raise ValueError(f"Cron workdir is not a directory: {resolved}") return str(resolved) def create_job( prompt: Optional[str], schedule: str, name: Optional[str] = None, repeat: Optional[int] = None, deliver: Optional[str] = None, origin: Optional[Dict[str, Any]] = None, skill: Optional[str] = None, skills: Optional[List[str]] = None, model: Optional[str] = None, provider: Optional[str] = None, base_url: Optional[str] = None, script: Optional[str] = None, context_from: Optional[Union[str, List[str]]] = None, enabled_toolsets: Optional[List[str]] = None, workdir: Optional[str] = None, no_agent: bool = False, ) -> Dict[str, Any]: """ Create a new cron job. Args: prompt: The prompt to run (must be self-contained, or a task instruction when skill is set). Ignored when ``no_agent=True`` except as an optional name hint. schedule: Schedule string (see parse_schedule) name: Optional friendly name repeat: How many times to run (None = forever, 1 = once) deliver: Where to deliver output ("origin", "local", "telegram", etc.) origin: Source info where job was created (for "origin" delivery) skill: Optional legacy single skill name to load before running the prompt skills: Optional ordered list of skills to load before running the prompt model: Optional per-job model override provider: Optional per-job provider override base_url: Optional per-job base URL override script: Optional path to a script whose stdout feeds the job. With ``no_agent=True`` the script IS the job — its stdout is delivered verbatim. Without ``no_agent``, its stdout is injected into the agent's prompt as context (data-collection / change-detection pattern). Paths resolve under ~/.hermes/scripts/; ``.sh`` / ``.bash`` files run via bash, anything else via Python. context_from: Optional job ID (or list of job IDs) whose most recent output is injected into the prompt as context before each run. Useful for chaining cron jobs: job A finds data, job B processes it. enabled_toolsets: Optional list of toolset names to restrict the agent to. When set, only tools from these toolsets are loaded, reducing token overhead. When omitted, all default tools are loaded. Ignored when ``no_agent=True``. workdir: Optional absolute path. When set, the job runs as if launched from that directory: AGENTS.md / CLAUDE.md / .cursorrules from that directory are injected into the system prompt, and the terminal/file/code_exec tools use it as their working directory (via TERMINAL_CWD). When unset, the old behaviour is preserved (no context files injected, tools use the scheduler's cwd). With ``no_agent=True``, ``workdir`` is still applied as the script's cwd so relative paths inside the script behave predictably. no_agent: When True, skip the agent entirely — run ``script`` on schedule and deliver its stdout directly. Empty stdout = silent (no delivery). Requires ``script`` to be set. Ideal for classic watchdogs and periodic alerts that don't need LLM reasoning. Returns: The created job dict """ parsed_schedule = parse_schedule(schedule) # Normalize repeat: treat 0 or negative values as None (infinite) if repeat is not None and repeat <= 0: repeat = None # Auto-set repeat=1 for one-shot schedules if not specified if parsed_schedule["kind"] == "once" and repeat is None: repeat = 1 # Default delivery to origin if available, otherwise local if deliver is None: deliver = "origin" if origin else "local" job_id = uuid.uuid4().hex[:12] now = _hermes_now().isoformat() normalized_skills = _normalize_skill_list(skill, skills) normalized_model = str(model).strip() if isinstance(model, str) else None normalized_provider = str(provider).strip() if isinstance(provider, str) else None normalized_base_url = str(base_url).strip().rstrip("/") if isinstance(base_url, str) else None normalized_model = normalized_model or None normalized_provider = normalized_provider or None normalized_base_url = normalized_base_url or None normalized_script = str(script).strip() if isinstance(script, str) else None normalized_script = normalized_script or None normalized_toolsets = [str(t).strip() for t in enabled_toolsets if str(t).strip()] if enabled_toolsets else None normalized_toolsets = normalized_toolsets or None normalized_workdir = _normalize_workdir(workdir) normalized_no_agent = bool(no_agent) # no_agent jobs are meaningless without a script — the script IS the job. # Surface this as a clear ValueError at create time so bad configs never # reach the scheduler. if normalized_no_agent and not normalized_script: raise ValueError( "no_agent=True requires a script — with no agent and no script " "there is nothing for the job to run." ) # Normalize context_from: accept str or list of str, store as list or None if isinstance(context_from, str): context_from = [context_from.strip()] if context_from.strip() else None elif isinstance(context_from, list): context_from = [str(j).strip() for j in context_from if str(j).strip()] or None else: context_from = None label_source = (prompt or (normalized_skills[0] if normalized_skills else None) or (normalized_script if normalized_no_agent else None)) or "cron job" job = { "id": job_id, "name": name or label_source[:50].strip(), "prompt": prompt, "skills": normalized_skills, "skill": normalized_skills[0] if normalized_skills else None, "model": normalized_model, "provider": normalized_provider, "base_url": normalized_base_url, "script": normalized_script, "no_agent": normalized_no_agent, "context_from": context_from, "schedule": parsed_schedule, "schedule_display": parsed_schedule.get("display", schedule), "repeat": { "times": repeat, # None = forever "completed": 0 }, "enabled": True, "state": "scheduled", "paused_at": None, "paused_reason": None, "created_at": now, "next_run_at": compute_next_run(parsed_schedule), "last_run_at": None, "last_status": None, "last_error": None, "last_delivery_error": None, # Delivery configuration "deliver": deliver, "origin": origin, # Tracks where job was created for "origin" delivery "enabled_toolsets": normalized_toolsets, "workdir": normalized_workdir, } jobs = load_jobs() jobs.append(job) save_jobs(jobs) return job def get_job(job_id: str) -> Optional[Dict[str, Any]]: """Get a job by ID.""" jobs = load_jobs() for job in jobs: if job["id"] == job_id: return _apply_skill_fields(job) return None def list_jobs(include_disabled: bool = False) -> List[Dict[str, Any]]: """List all jobs, optionally including disabled ones.""" jobs = [_apply_skill_fields(j) for j in load_jobs()] if not include_disabled: jobs = [j for j in jobs if j.get("enabled", True)] return jobs def update_job(job_id: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update a job by ID, refreshing derived schedule fields when needed.""" jobs = load_jobs() for i, job in enumerate(jobs): if job["id"] != job_id: continue # Validate / normalize workdir if present in updates. Empty string or # None both mean "clear the field" (restore old behaviour). if "workdir" in updates: _wd = updates["workdir"] if _wd in (None, "", False): updates["workdir"] = None else: updates["workdir"] = _normalize_workdir(_wd) updated = _apply_skill_fields({**job, **updates}) schedule_changed = "schedule" in updates if "skills" in updates or "skill" in updates: normalized_skills = _normalize_skill_list(updated.get("skill"), updated.get("skills")) updated["skills"] = normalized_skills updated["skill"] = normalized_skills[0] if normalized_skills else None if schedule_changed: updated_schedule = updated["schedule"] # The API may pass schedule as a raw string (e.g. "every 10m") # instead of a pre-parsed dict. Normalize it the same way # create_job() does so downstream code can call .get() safely. if isinstance(updated_schedule, str): updated_schedule = parse_schedule(updated_schedule) updated["schedule"] = updated_schedule updated["schedule_display"] = updates.get( "schedule_display", updated_schedule.get("display", updated.get("schedule_display")), ) if updated.get("state") != "paused": updated["next_run_at"] = compute_next_run(updated_schedule) if updated.get("enabled", True) and updated.get("state") != "paused" and not updated.get("next_run_at"): updated["next_run_at"] = compute_next_run(updated["schedule"]) jobs[i] = updated save_jobs(jobs) return _apply_skill_fields(jobs[i]) return None def pause_job(job_id: str, reason: Optional[str] = None) -> Optional[Dict[str, Any]]: """Pause a job without deleting it.""" return update_job( job_id, { "enabled": False, "state": "paused", "paused_at": _hermes_now().isoformat(), "paused_reason": reason, }, ) def resume_job(job_id: str) -> Optional[Dict[str, Any]]: """Resume a paused job and compute the next future run from now.""" job = get_job(job_id) if not job: return None next_run_at = compute_next_run(job["schedule"]) return update_job( job_id, { "enabled": True, "state": "scheduled", "paused_at": None, "paused_reason": None, "next_run_at": next_run_at, }, ) def trigger_job(job_id: str) -> Optional[Dict[str, Any]]: """Schedule a job to run on the next scheduler tick.""" job = get_job(job_id) if not job: return None return update_job( job_id, { "enabled": True, "state": "scheduled", "paused_at": None, "paused_reason": None, "next_run_at": _hermes_now().isoformat(), }, ) def remove_job(job_id: str) -> bool: """Remove a job by ID.""" jobs = load_jobs() original_len = len(jobs) jobs = [j for j in jobs if j["id"] != job_id] if len(jobs) < original_len: save_jobs(jobs) return True return False def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, delivery_error: Optional[str] = None): """ Mark a job as having been run. Updates last_run_at, last_status, increments completed count, computes next_run_at, and auto-deletes if repeat limit reached. ``delivery_error`` is tracked separately from the agent error — a job can succeed (agent produced output) but fail delivery (platform down). """ with _jobs_file_lock: jobs = load_jobs() for i, job in enumerate(jobs): if job["id"] == job_id: now = _hermes_now().isoformat() job["last_run_at"] = now job["last_status"] = "ok" if success else "error" job["last_error"] = error if not success else None # Track delivery failures separately — cleared on successful delivery job["last_delivery_error"] = delivery_error # Increment completed count if job.get("repeat"): job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1 # Check if we've hit the repeat limit times = job["repeat"].get("times") completed = job["repeat"]["completed"] if times is not None and times > 0 and completed >= times: # Remove the job (limit reached) jobs.pop(i) save_jobs(jobs) return # Compute next run job["next_run_at"] = compute_next_run(job["schedule"], now) # If no next run, decide whether this is terminal completion # (one-shot) or a transient failure (recurring schedule couldn't # compute — e.g. 'croniter' missing from the runtime env). # Recurring jobs must NEVER be silently disabled: that turns a # missing runtime dep into "job completed" and the user's # schedule quietly goes off. See issue #16265. if job["next_run_at"] is None: kind = job.get("schedule", {}).get("kind") if kind in ("cron", "interval"): job["state"] = "error" if not job.get("last_error"): job["last_error"] = ( "Failed to compute next run for recurring " "schedule (is the 'croniter' package " "installed in the gateway's Python env?)" ) logger.error( "Job '%s' (%s) could not compute next_run_at; " "leaving enabled and marking state=error so the " "job is not silently disabled.", job.get("name", job["id"]), kind, ) else: job["enabled"] = False job["state"] = "completed" elif job.get("state") != "paused": job["state"] = "scheduled" save_jobs(jobs) return logger.warning("mark_job_run: job_id %s not found, skipping save", job_id) def advance_next_run(job_id: str) -> bool: """Preemptively advance next_run_at for a recurring job before execution. Call this BEFORE run_job() so that if the process crashes mid-execution, the job won't re-fire on the next gateway restart. This converts the scheduler from at-least-once to at-most-once for recurring jobs — missing one run is far better than firing dozens of times in a crash loop. One-shot jobs are left unchanged so they can still retry on restart. Returns True if next_run_at was advanced, False otherwise. """ with _jobs_file_lock: jobs = load_jobs() for job in jobs: if job["id"] == job_id: kind = job.get("schedule", {}).get("kind") if kind not in ("cron", "interval"): return False now = _hermes_now().isoformat() new_next = compute_next_run(job["schedule"], now) if new_next and new_next != job.get("next_run_at"): job["next_run_at"] = new_next save_jobs(jobs) return True return False return False def get_due_jobs() -> List[Dict[str, Any]]: """Get all jobs that are due to run now. For recurring jobs (cron/interval), if the scheduled time is stale (more than one period in the past, e.g. because the gateway was down), the job is fast-forwarded to the next future run instead of firing immediately. This prevents a burst of missed jobs on gateway restart. """ now = _hermes_now() raw_jobs = load_jobs() jobs = [_apply_skill_fields(j) for j in copy.deepcopy(raw_jobs)] due = [] needs_save = False for job in jobs: if not job.get("enabled", True): continue next_run = job.get("next_run_at") if not next_run: schedule = job.get("schedule", {}) kind = schedule.get("kind") # One-shot jobs use a small grace window via the dedicated helper. recovered_next = _recoverable_oneshot_run_at( schedule, now, last_run_at=job.get("last_run_at"), ) recovery_kind = "one-shot" if recovered_next else None # Recurring jobs reach here only when something — typically a # direct jobs.json edit that bypassed add_job() — left # next_run_at unset. Without this branch, such jobs are # silently skipped forever; recompute next_run_at from the # schedule so they pick up at their next scheduled tick. if not recovered_next and kind in ("cron", "interval"): recovered_next = compute_next_run(schedule, now.isoformat()) if recovered_next: recovery_kind = kind if not recovered_next: continue job["next_run_at"] = recovered_next next_run = recovered_next logger.info( "Job '%s' had no next_run_at; recovering %s run at %s", job.get("name", job["id"]), recovery_kind, recovered_next, ) for rj in raw_jobs: if rj["id"] == job["id"]: rj["next_run_at"] = recovered_next needs_save = True break next_run_dt = _ensure_aware(datetime.fromisoformat(next_run)) if next_run_dt <= now: schedule = job.get("schedule", {}) kind = schedule.get("kind") # For recurring jobs, check if the scheduled time is stale # (gateway was down and missed the window). Fast-forward to # the next future occurrence instead of firing a stale run. grace = _compute_grace_seconds(schedule) if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > grace: # Job is past its catch-up grace window — this is a stale missed run. # Grace scales with schedule period: daily=2h, hourly=30m, 10min=5m. new_next = compute_next_run(schedule, now.isoformat()) if new_next: logger.info( "Job '%s' missed its scheduled time (%s, grace=%ds). " "Fast-forwarding to next run: %s", job.get("name", job["id"]), next_run, grace, new_next, ) # Update the job in storage for rj in raw_jobs: if rj["id"] == job["id"]: rj["next_run_at"] = new_next needs_save = True break continue # Skip this run due.append(job) if needs_save: save_jobs(raw_jobs) return due def save_job_output(job_id: str, output: str): """Save job output to file.""" ensure_dirs() job_output_dir = OUTPUT_DIR / job_id job_output_dir.mkdir(parents=True, exist_ok=True) _secure_dir(job_output_dir) timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S") output_file = job_output_dir / f"{timestamp}.md" fd, tmp_path = tempfile.mkstemp(dir=str(job_output_dir), suffix='.tmp', prefix='.output_') try: with os.fdopen(fd, 'w', encoding='utf-8') as f: f.write(output) f.flush() os.fsync(f.fileno()) atomic_replace(tmp_path, output_file) _secure_file(output_file) except BaseException: try: os.unlink(tmp_path) except OSError: pass raise return output_file # ============================================================================= # Skill reference rewriting (curator integration) # ============================================================================= def rewrite_skill_refs( consolidated: Optional[Dict[str, str]] = None, pruned: Optional[List[str]] = None, ) -> Dict[str, Any]: """Rewrite cron job skill references after a curator consolidation pass. When the curator consolidates a skill X into umbrella Y (or archives X as pruned), any cron job that lists ``X`` in its ``skills`` field will fail to load ``X`` at run time — the scheduler logs a warning and skips the skill, so the job runs without the instructions it was scheduled to follow. See cron/scheduler.py where ``skill_view`` is called per skill name. This function repairs cron jobs in-place: - A skill listed in ``consolidated`` is replaced with its umbrella target (the ``into`` value). If the umbrella is already in the job's skill list, the stale name is dropped without duplication. - A skill listed in ``pruned`` is dropped outright — there is no forwarding target. - Ordering and other skills in the list are preserved. - The legacy ``skill`` field is realigned via ``_apply_skill_fields``. Args: consolidated: mapping of ``old_skill_name -> umbrella_skill_name``. pruned: list of skill names that were archived with no forwarding target. Returns a report dict:: { "rewrites": [ { "job_id": ..., "job_name": ..., "before": [...], "after": [...], "mapped": {"old": "new", ...}, "dropped": ["old", ...], }, ... ], "jobs_updated": N, "jobs_scanned": M, } Best-effort: exceptions from loading/saving propagate to the caller so tests can assert behaviour; the curator invocation site wraps this call in a try/except so a failure here never breaks the curator. """ consolidated = dict(consolidated or {}) pruned_set = set(pruned or []) # A skill listed in both wins as "consolidated" — it has a target, # which is the more useful of the two outcomes. pruned_set -= set(consolidated.keys()) if not consolidated and not pruned_set: return {"rewrites": [], "jobs_updated": 0, "jobs_scanned": 0} with _jobs_file_lock: jobs = load_jobs() rewrites: List[Dict[str, Any]] = [] changed = False for job in jobs: skills_before = _normalize_skill_list(job.get("skill"), job.get("skills")) if not skills_before: continue mapped: Dict[str, str] = {} dropped: List[str] = [] new_skills: List[str] = [] for name in skills_before: if name in consolidated: target = consolidated[name] mapped[name] = target if target and target not in new_skills: new_skills.append(target) elif name in pruned_set: dropped.append(name) else: if name not in new_skills: new_skills.append(name) if not mapped and not dropped: continue job["skills"] = new_skills job["skill"] = new_skills[0] if new_skills else None changed = True rewrites.append({ "job_id": job.get("id"), "job_name": job.get("name") or job.get("id"), "before": list(skills_before), "after": list(new_skills), "mapped": mapped, "dropped": dropped, }) if changed: save_jobs(jobs) logger.info( "Curator rewrote skill references in %d cron job(s)", len(rewrites) ) return { "rewrites": rewrites, "jobs_updated": len(rewrites), "jobs_scanned": len(jobs), }