fix(cron): harden scheduler against path traversal and env leaks

Cherry-picked from PR #5503 by Awsh1.

- Validate ALL script paths (absolute, relative, tilde) against scripts_dir boundary
- Add API-boundary validation in cronjob_tools.py
- Move os.environ injections inside try block so finally cleanup always runs
- Comprehensive regression tests for path containment bypass
This commit is contained in:
Awsh1 2026-04-06 12:12:45 -07:00 committed by Teknium
parent 7d0953d6ff
commit 878b1d3d33
3 changed files with 349 additions and 27 deletions

View file

@ -112,6 +112,45 @@ def _normalize_optional_job_value(value: Optional[Any], *, strip_trailing_slash:
return text or None
def _validate_cron_script_path(script: Optional[str]) -> Optional[str]:
"""Validate a cron job script path at the API boundary.
Scripts must be relative paths that resolve within HERMES_HOME/scripts/.
Absolute paths and ~ expansion are rejected to prevent arbitrary script
execution via prompt injection.
Returns an error string if blocked, else None (valid).
"""
if not script or not script.strip():
return None # empty/None = clearing the field, always OK
from pathlib import Path
from hermes_constants import get_hermes_home
raw = script.strip()
# Reject absolute paths and ~ expansion at the API boundary.
# Only relative paths within ~/.hermes/scripts/ are allowed.
if raw.startswith(("/", "~")) or (len(raw) >= 2 and raw[1] == ":"):
return (
f"Script path must be relative to ~/.hermes/scripts/. "
f"Got absolute or home-relative path: {raw!r}. "
f"Place scripts in ~/.hermes/scripts/ and use just the filename."
)
# Validate containment after resolution
scripts_dir = get_hermes_home() / "scripts"
scripts_dir.mkdir(parents=True, exist_ok=True)
resolved = (scripts_dir / raw).resolve()
try:
resolved.relative_to(scripts_dir.resolve())
except ValueError:
return (
f"Script path escapes the scripts directory via traversal: {raw!r}"
)
return None
def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
prompt = job.get("prompt", "")
@ -176,6 +215,12 @@ def cronjob(
if scan_error:
return json.dumps({"success": False, "error": scan_error}, indent=2)
# Validate script path before storing
if script:
script_error = _validate_cron_script_path(script)
if script_error:
return json.dumps({"success": False, "error": script_error}, indent=2)
job = create_job(
prompt=prompt or "",
schedule=schedule,
@ -272,6 +317,10 @@ def cronjob(
updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
if script is not None:
# Pass empty string to clear an existing script
if script:
script_error = _validate_cron_script_path(script)
if script_error:
return json.dumps({"success": False, "error": script_error}, indent=2)
updates["script"] = _normalize_optional_job_value(script) if script else None
if repeat is not None:
# Normalize: treat 0 or negative as None (infinite)