hermes-agent/hermes_cli/cron.py
Teknium 660e36f097
fix(cron): scope job execution to its owning profile (#32091 follow-up) (#50993)
The #32091 fix moved every profile's cron jobs into one shared root store,
but never wired the execution-scoping half it recommended: a job still ran
under whichever profile's ticker picked it up, not its owning profile. So a
job created under `hermes -p donna` could execute with the root profile's
.env / config.yaml / credentials.

- jobs.py: create_job auto-captures the active profile (explicit profile=
  override available) and stores it on the job; resolve_profile_home() maps a
  profile name to its HERMES_HOME; legacy jobs backfill to 'default'.
- scheduler.py: run_job applies the job's profile via a scoped HERMES_HOME
  override (env var + in-process ContextVar) before any .env/config/script
  load, restored in finally. tick() routes profile-mismatched jobs to the
  single-worker sequential pool so the env mutation can't race.
- cronjob tool threads profile through (NOT exposed in the model schema, to
  avoid cross-profile privilege escalation); hermes cron add gains --profile.

E2E verified against a temp HERMES_HOME with a real profile dir: a root-profile
ticker runs a profile='donna' job with HERMES_HOME=donna during execution and
restores the ticker env afterward.
2026-06-22 14:54:28 -07:00

411 lines
16 KiB
Python

"""
Cron subcommand for hermes CLI.
Handles standalone cron management commands like list, create, edit,
pause/resume/run/remove, status, and tick.
"""
import json
import re
import sys
from pathlib import Path
from typing import Iterable, List, Optional
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(PROJECT_ROOT))
from hermes_cli.colors import Colors, color
# Patterns that indicate a cron job targets the gateway lifecycle.
# Matches commands that restart/stop the gateway or its service manager.
# Deliberately specific — a bare "gateway ... restart" catch-all would block
# legitimate prompts that merely mention an unrelated gateway (e.g. "summarize
# the API gateway logs and report restart events").
_GATEWAY_LIFECYCLE_PATTERNS = re.compile(
r"(?i)"
r"(hermes\s+gateway\s+(restart|stop|start))"
r"|(launchctl\s+(kickstart|unload|load|stop|restart)\s+.*hermes)"
r"|(systemctl\s+(-\S+\s+)*(restart|stop|start)\s+.*hermes)"
r"|(p?kill\s+.*hermes.*gateway)"
)
def _contains_gateway_lifecycle_command(text: str) -> bool:
"""Return True if *text* contains a gateway lifecycle command pattern."""
return bool(_GATEWAY_LIFECYCLE_PATTERNS.search(text))
def _normalize_skills(single_skill=None, skills: Optional[Iterable[str]] = None) -> Optional[List[str]]:
if skills is None:
if single_skill is None:
return None
raw_items = [single_skill]
else:
raw_items = list(skills)
normalized: List[str] = []
for item in raw_items:
text = str(item or "").strip()
if text and text not in normalized:
normalized.append(text)
return normalized
def _cron_api(**kwargs):
from tools.cronjob_tools import cronjob as cronjob_tool
return json.loads(cronjob_tool(**kwargs))
def cron_list(show_all: bool = False):
"""List all scheduled jobs."""
from cron.jobs import list_jobs
jobs = list_jobs(include_disabled=show_all)
if not jobs:
print(color("No scheduled jobs.", Colors.DIM))
print(color("Create one with 'hermes cron create ...' or the /cron command in chat.", Colors.DIM))
return
print()
print(color("┌─────────────────────────────────────────────────────────────────────────┐", Colors.CYAN))
print(color("│ Scheduled Jobs │", Colors.CYAN))
print(color("└─────────────────────────────────────────────────────────────────────────┘", Colors.CYAN))
print()
for job in jobs:
job_id = job.get("id", "?")
name = job.get("name", "(unnamed)")
schedule = job.get("schedule_display", job.get("schedule", {}).get("value", "?"))
state = job.get("state", "scheduled" if job.get("enabled", True) else "paused")
next_run = job.get("next_run_at", "?")
# `repeat` may be present-but-null in the job record (e.g. a one-shot
# job persisted with "repeat": null), so coalesce to {} rather than
# relying on the dict-default, which only applies to a missing key.
repeat_info = job.get("repeat") or {}
repeat_times = repeat_info.get("times")
repeat_completed = repeat_info.get("completed", 0)
repeat_str = f"{repeat_completed}/{repeat_times}" if repeat_times else ""
deliver = job.get("deliver", ["local"])
if isinstance(deliver, str):
deliver = [deliver]
deliver_str = ", ".join(deliver)
skills = job.get("skills") or ([job["skill"]] if job.get("skill") else [])
if state == "paused":
status = color("[paused]", Colors.YELLOW)
elif state == "completed":
status = color("[completed]", Colors.BLUE)
elif job.get("enabled", True):
status = color("[active]", Colors.GREEN)
else:
status = color("[disabled]", Colors.RED)
print(f" {color(job_id, Colors.YELLOW)} {status}")
print(f" Name: {name}")
print(f" Schedule: {schedule}")
print(f" Repeat: {repeat_str}")
print(f" Next run: {next_run}")
print(f" Deliver: {deliver_str}")
if skills:
print(f" Skills: {', '.join(skills)}")
script = job.get("script")
if script:
print(f" Script: {script}")
if job.get("no_agent"):
print(f" Mode: {color('no-agent', Colors.DIM)} (script stdout delivered directly)")
workdir = job.get("workdir")
if workdir:
print(f" Workdir: {workdir}")
_prof = job.get("profile")
if _prof and _prof != "default":
print(f" Profile: {_prof}")
# Execution history
last_status = job.get("last_status")
if last_status:
last_run = job.get("last_run_at", "?")
if last_status == "ok":
status_display = color("ok", Colors.GREEN)
else:
status_display = color(f"{last_status}: {job.get('last_error', '?')}", Colors.RED)
print(f" Last run: {last_run} {status_display}")
delivery_err = job.get("last_delivery_error")
if delivery_err:
print(f" {color('⚠ Delivery failed:', Colors.YELLOW)} {delivery_err}")
print()
from hermes_cli.gateway import find_gateway_pids
if not find_gateway_pids():
print(color(" ⚠ Gateway is not running — jobs won't fire automatically.", Colors.YELLOW))
print(color(" Start it with: hermes gateway install", Colors.DIM))
print(color(" sudo hermes gateway install --system # Linux servers", Colors.DIM))
print()
def cron_tick():
"""Run due jobs once and exit."""
from cron.scheduler import tick
tick(verbose=True)
def cron_status():
"""Show cron execution status."""
from cron.jobs import list_jobs
from hermes_cli.gateway import find_gateway_pids
print()
pids = find_gateway_pids()
if pids:
# The gateway PROCESS is alive — but the cron ticker THREAD inside it
# can die silently, or stay alive while every tick fails. Check both
# the liveness heartbeat and the last-successful-tick marker so we
# don't report "will fire" when the ticker is dead or failing
# (#32612, #32895).
from cron.jobs import (
get_ticker_heartbeat_age,
get_ticker_success_age,
TICKER_INTERVAL_SECONDS,
)
# Allow ~3 missed ticker iterations (+ a little slack) before declaring
# trouble. Derived from the shared interval constant so this threshold
# tracks the ticker cadence instead of assuming a hardcoded 60s.
STALE_AFTER = TICKER_INTERVAL_SECONDS * 3 + 20 # = 200s at the 60s default
hb_age = get_ticker_heartbeat_age()
ok_age = get_ticker_success_age()
if hb_age is not None and hb_age > STALE_AFTER:
# No heartbeat at all → the ticker thread is gone.
print(color(
"⚠ Gateway is running but the cron ticker looks STALLED — "
f"no heartbeat for {int(hb_age)}s (expected every ~60s).",
Colors.YELLOW,
))
print(f" PID: {', '.join(map(str, pids))}")
print(" Cron jobs may NOT be firing. Restart: hermes gateway restart")
elif hb_age is not None and ok_age is not None and ok_age > STALE_AFTER:
# Loop is alive (fresh heartbeat) but no tick has SUCCEEDED in a
# long time → ticks are failing every iteration.
print(color(
"⚠ Gateway and cron ticker are running, but no tick has "
f"succeeded in {int(ok_age)}s — ticks may be failing.",
Colors.YELLOW,
))
print(f" PID: {', '.join(map(str, pids))}")
print(" Check the gateway log for 'Cron tick error'.")
else:
print(color("✓ Gateway is running — cron jobs will fire automatically", Colors.GREEN))
print(f" PID: {', '.join(map(str, pids))}")
if hb_age is not None:
print(f" Ticker heartbeat: {int(hb_age)}s ago")
else:
print(color("✗ Gateway is not running — cron jobs will NOT fire", Colors.RED))
print()
print(" To enable automatic execution:")
print(" hermes gateway install # Install as a user service")
print(" sudo hermes gateway install --system # Linux servers: boot-time system service")
print(" hermes gateway # Or run in foreground")
print()
jobs = list_jobs(include_disabled=False)
if jobs:
next_runs = [j.get("next_run_at") for j in jobs if j.get("next_run_at")]
print(f" {len(jobs)} active job(s)")
if next_runs:
print(f" Next run: {min(next_runs)}")
else:
print(" No active jobs")
print()
def cron_create(args):
# Defense: reject cron jobs that contain gateway lifecycle commands.
# Prevents agents from scheduling their own restart/stop, which creates
# SIGTERM-respawn loops under launchd/systemd KeepAlive (#30719).
prompt = getattr(args, "prompt", None) or ""
script = getattr(args, "script", None)
combined = prompt
if script:
try:
script_text = Path(script).read_text(encoding="utf-8")
combined = f"{combined}\n{script_text}"
except (OSError, UnicodeDecodeError):
pass
if _contains_gateway_lifecycle_command(combined):
print(color(
"Blocked: cron job contains a gateway lifecycle command "
"(restart/stop/kill).\n"
"This is blocked to prevent restart loops (#30719).\n"
"Use `hermes gateway restart` from a shell outside the gateway.",
Colors.RED,
))
return 1
result = _cron_api(
action="create",
schedule=args.schedule,
prompt=args.prompt,
name=getattr(args, "name", None),
deliver=getattr(args, "deliver", None),
repeat=getattr(args, "repeat", None),
skill=getattr(args, "skill", None),
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
script=getattr(args, "script", None),
workdir=getattr(args, "workdir", None),
no_agent=getattr(args, "no_agent", False) or None,
profile=getattr(args, "profile", None),
)
if not result.get("success"):
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
return 1
print(color(f"Created job: {result['job_id']}", Colors.GREEN))
print(f" Name: {result['name']}")
print(f" Schedule: {result['schedule']}")
if result.get("skills"):
print(f" Skills: {', '.join(result['skills'])}")
job_data = result.get("job", {})
if job_data.get("script"):
print(f" Script: {job_data['script']}")
if job_data.get("no_agent"):
print(" Mode: no-agent (script stdout delivered directly)")
if job_data.get("workdir"):
print(f" Workdir: {job_data['workdir']}")
_prof = job_data.get("profile")
if _prof and _prof != "default":
print(f" Profile: {_prof}")
print(f" Next run: {result['next_run_at']}")
return 0
def cron_edit(args):
from cron.jobs import AmbiguousJobReference, resolve_job_ref
try:
job = resolve_job_ref(args.job_id)
except AmbiguousJobReference as exc:
print(color(str(exc), Colors.RED))
for m in exc.matches:
print(f" {m['id']} (name: {m.get('name')!r})")
return 1
if not job:
print(color(f"Job not found: {args.job_id}", Colors.RED))
return 1
existing_skills = list(job.get("skills") or ([] if not job.get("skill") else [job.get("skill")]))
replacement_skills = _normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None))
add_skills = _normalize_skills(None, getattr(args, "add_skills", None)) or []
remove_skills = set(_normalize_skills(None, getattr(args, "remove_skills", None)) or [])
final_skills = None
if getattr(args, "clear_skills", False):
final_skills = []
elif replacement_skills is not None:
final_skills = replacement_skills
elif add_skills or remove_skills:
final_skills = [skill for skill in existing_skills if skill not in remove_skills]
for skill in add_skills:
if skill not in final_skills:
final_skills.append(skill)
result = _cron_api(
action="update",
job_id=args.job_id,
schedule=getattr(args, "schedule", None),
prompt=getattr(args, "prompt", None),
name=getattr(args, "name", None),
deliver=getattr(args, "deliver", None),
repeat=getattr(args, "repeat", None),
skills=final_skills,
script=getattr(args, "script", None),
workdir=getattr(args, "workdir", None),
no_agent=getattr(args, "no_agent", None),
)
if not result.get("success"):
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))
return 1
updated = result["job"]
print(color(f"Updated job: {updated['job_id']}", Colors.GREEN))
print(f" Name: {updated['name']}")
print(f" Schedule: {updated['schedule']}")
if updated.get("skills"):
print(f" Skills: {', '.join(updated['skills'])}")
else:
print(" Skills: none")
if updated.get("script"):
print(f" Script: {updated['script']}")
if updated.get("no_agent"):
print(" Mode: no-agent (script stdout delivered directly)")
if updated.get("workdir"):
print(f" Workdir: {updated['workdir']}")
return 0
def _job_action(action: str, job_id: str, success_verb: str) -> int:
result = _cron_api(action=action, job_id=job_id)
if not result.get("success"):
print(color(f"Failed to {action} job: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job") or result.get("removed_job") or {}
print(color(f"{success_verb} job: {job.get('name', job_id)} ({job_id})", Colors.GREEN))
if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"):
print(f" Next run: {result['job']['next_run_at']}")
if action == "run":
job = result.get("job", {})
if job.get("executed"):
outcome = "succeeded" if job.get("execution_success") else "failed"
print(f" Ran now: {outcome}.")
elif job.get("execution_skipped"):
print(f" {job['execution_skipped']}")
else:
print(" It will run on the next scheduler tick.")
return 0
def cron_command(args):
"""Handle cron subcommands."""
subcmd = getattr(args, 'cron_command', None)
if subcmd is None or subcmd == "list":
show_all = getattr(args, 'all', False)
cron_list(show_all)
return 0
if subcmd == "status":
cron_status()
return 0
if subcmd == "tick":
cron_tick()
return 0
if subcmd in {"create", "add"}:
return cron_create(args)
if subcmd == "edit":
return cron_edit(args)
if subcmd == "pause":
return _job_action("pause", args.job_id, "Paused")
if subcmd == "resume":
return _job_action("resume", args.job_id, "Resumed")
if subcmd == "run":
return _job_action("run", args.job_id, "Triggered")
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")
print(f"Unknown cron command: {subcmd}")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|status|tick]")
sys.exit(1)