feat(cron): add context_from field for cron job output chaining

This commit is contained in:
MorAlekss 2026-04-23 02:35:41 -07:00
parent 00c3d848d8
commit d0ce224511
4 changed files with 337 additions and 2 deletions

View file

@ -16,7 +16,7 @@ import uuid
from datetime import datetime, timedelta
from pathlib import Path
from hermes_constants import get_hermes_home
from typing import Optional, Dict, List, Any
from typing import Optional, Dict, List, Any, Union
logger = logging.getLogger(__name__)
@ -417,6 +417,7 @@ def create_job(
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
context_from: Optional[Union[str, List[str]]] = None,
enabled_toolsets: Optional[List[str]] = None,
workdir: Optional[str] = None,
) -> Dict[str, Any]:
@ -438,6 +439,9 @@ def create_job(
script: Optional path to a Python script whose stdout is injected into the
prompt each run. The script runs before the agent turn, and its output
is prepended as context. Useful for data collection / change detection.
context_from: Optional job ID (or list of job IDs) whose most recent output
is injected into the prompt as context before each run.
Useful for chaining cron jobs: job A finds data, job B processes it.
enabled_toolsets: Optional list of toolset names to restrict the agent to.
When set, only tools from these toolsets are loaded, reducing
token overhead. When omitted, all default tools are loaded.
@ -481,6 +485,14 @@ def create_job(
normalized_toolsets = normalized_toolsets or None
normalized_workdir = _normalize_workdir(workdir)
# Normalize context_from: accept str or list of str, store as list or None
if isinstance(context_from, str):
context_from = [context_from.strip()] if context_from.strip() else None
elif isinstance(context_from, list):
context_from = [str(j).strip() for j in context_from if str(j).strip()] or None
else:
context_from = None
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
job = {
"id": job_id,
@ -492,6 +504,7 @@ def create_job(
"provider": normalized_provider,
"base_url": normalized_base_url,
"script": normalized_script,
"context_from": context_from,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule),
"repeat": {

View file

@ -671,6 +671,61 @@ def _build_job_prompt(job: dict, prerun_script: Optional[tuple] = None) -> str:
f"{prompt}"
)
# Inject output from referenced cron jobs as context.
context_from = job.get("context_from")
if context_from:
from cron.jobs import OUTPUT_DIR
if isinstance(context_from, str):
context_from = [context_from]
for source_job_id in context_from:
# Guard against path traversal — valid job IDs are 12-char hex strings
if not source_job_id or not all(c in "0123456789abcdef" for c in source_job_id):
logger.warning("context_from: skipping invalid job_id %r", source_job_id)
continue
try:
job_output_dir = OUTPUT_DIR / source_job_id
if not job_output_dir.exists():
prompt = (
f"[Context job '{source_job_id}' has no output yet.]\n\n"
f"{prompt}"
)
continue
output_files = sorted(
job_output_dir.glob("*.md"),
key=lambda f: f.stat().st_mtime,
reverse=True,
)
if not output_files:
prompt = (
f"[Context job '{source_job_id}' has no output yet.]\n\n"
f"{prompt}"
)
continue
latest_output = output_files[0].read_text(encoding="utf-8").strip()
# Truncate to 8K characters to avoid prompt bloat
_MAX_CONTEXT_CHARS = 8000
if len(latest_output) > _MAX_CONTEXT_CHARS:
latest_output = latest_output[:_MAX_CONTEXT_CHARS] + "\n\n[... output truncated ...]"
if latest_output:
prompt = (
f"## Output from job '{source_job_id}'\n"
"The following is the most recent output from a preceding "
"cron job. Use it as context for your analysis.\n\n"
f"```\n{latest_output}\n```\n\n"
f"{prompt}"
)
else:
prompt = (
f"[Context job '{source_job_id}' has no output yet.]\n\n"
f"{prompt}"
)
except (OSError, PermissionError) as e:
logger.warning("context_from: failed to read output for job %r: %s", source_job_id, e)
prompt = (
f"[Context job '{source_job_id}' output could not be read.]\n\n"
f"{prompt}"
)
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
cron_hint = (

View file

@ -0,0 +1,238 @@
"""Tests for cron job context_from feature (issue #5439 Option C)."""
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
@pytest.fixture
def cron_env(tmp_path, monkeypatch):
"""Isolated cron environment with temp HERMES_HOME."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "cron").mkdir()
(hermes_home / "cron" / "output").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import cron.jobs as jobs_mod
monkeypatch.setattr(jobs_mod, "HERMES_DIR", hermes_home)
monkeypatch.setattr(jobs_mod, "CRON_DIR", hermes_home / "cron")
monkeypatch.setattr(jobs_mod, "JOBS_FILE", hermes_home / "cron" / "jobs.json")
monkeypatch.setattr(jobs_mod, "OUTPUT_DIR", hermes_home / "cron" / "output")
return hermes_home
class TestJobContextFromField:
"""Test that context_from is stored and retrieved correctly."""
def test_create_job_with_context_from_string(self, cron_env):
from cron.jobs import create_job, get_job
job_a = create_job(prompt="Find news", schedule="every 1h")
job_b = create_job(
prompt="Summarize findings",
schedule="every 2h",
context_from=job_a["id"],
)
assert job_b["context_from"] == [job_a["id"]]
loaded = get_job(job_b["id"])
assert loaded["context_from"] == [job_a["id"]]
def test_create_job_with_context_from_list(self, cron_env):
from cron.jobs import create_job, get_job
job_a = create_job(prompt="Find news", schedule="every 1h")
job_b = create_job(prompt="Find weather", schedule="every 1h")
job_c = create_job(
prompt="Summarize everything",
schedule="every 2h",
context_from=[job_a["id"], job_b["id"]],
)
assert job_c["context_from"] == [job_a["id"], job_b["id"]]
def test_create_job_without_context_from(self, cron_env):
from cron.jobs import create_job
job = create_job(prompt="Hello", schedule="every 1h")
assert job.get("context_from") is None
def test_context_from_empty_string_normalized_to_none(self, cron_env):
from cron.jobs import create_job
job = create_job(prompt="Hello", schedule="every 1h", context_from="")
assert job.get("context_from") is None
def test_context_from_empty_list_normalized_to_none(self, cron_env):
from cron.jobs import create_job
job = create_job(prompt="Hello", schedule="every 1h", context_from=[])
assert job.get("context_from") is None
class TestBuildJobPromptContextFrom:
"""Test that _build_job_prompt() injects context from referenced jobs."""
def test_injects_latest_output(self, cron_env):
from cron.jobs import create_job, OUTPUT_DIR
from cron.scheduler import _build_job_prompt
job_a = create_job(prompt="Find news", schedule="every 1h")
# Записываем output для job_a
output_dir = OUTPUT_DIR / job_a["id"]
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "2026-04-22_10-00-00.md").write_text(
"Today's top story: AI is everywhere.", encoding="utf-8"
)
job_b = create_job(
prompt="Summarize the news",
schedule="every 2h",
context_from=job_a["id"],
)
prompt = _build_job_prompt(job_b)
assert "Today's top story: AI is everywhere." in prompt
assert f"Output from job '{job_a['id']}'" in prompt
def test_uses_most_recent_output(self, cron_env):
from cron.jobs import create_job, OUTPUT_DIR
from cron.scheduler import _build_job_prompt
import time
job_a = create_job(prompt="Find news", schedule="every 1h")
output_dir = OUTPUT_DIR / job_a["id"]
output_dir.mkdir(parents=True, exist_ok=True)
old_file = output_dir / "2026-04-22_08-00-00.md"
old_file.write_text("Old output", encoding="utf-8")
time.sleep(0.01)
new_file = output_dir / "2026-04-22_10-00-00.md"
new_file.write_text("New output", encoding="utf-8")
job_b = create_job(
prompt="Summarize", schedule="every 2h", context_from=job_a["id"]
)
prompt = _build_job_prompt(job_b)
assert "New output" in prompt
assert "Old output" not in prompt
def test_graceful_when_no_output_yet(self, cron_env):
from cron.jobs import create_job
from cron.scheduler import _build_job_prompt
job_a = create_job(prompt="Find news", schedule="every 1h")
job_b = create_job(
prompt="Summarize", schedule="every 2h", context_from=job_a["id"]
)
# job_a ещё не запускался — output dir не существует
prompt = _build_job_prompt(job_b)
assert "no output yet" in prompt.lower() or "not found" in prompt.lower()
assert "Summarize" in prompt
def test_injects_multiple_context_jobs(self, cron_env):
from cron.jobs import create_job, OUTPUT_DIR
from cron.scheduler import _build_job_prompt
job_a = create_job(prompt="Find news", schedule="every 1h")
job_b = create_job(prompt="Find weather", schedule="every 1h")
for job, content in [(job_a, "News: AI boom"), (job_b, "Weather: Sunny")]:
out_dir = OUTPUT_DIR / job["id"]
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / "2026-04-22_10-00-00.md").write_text(content, encoding="utf-8")
job_c = create_job(
prompt="Daily briefing",
schedule="every 2h",
context_from=[job_a["id"], job_b["id"]],
)
prompt = _build_job_prompt(job_c)
assert "News: AI boom" in prompt
assert "Weather: Sunny" in prompt
def test_context_injected_before_prompt(self, cron_env):
"""Context should appear before the job's own prompt."""
from cron.jobs import create_job, OUTPUT_DIR
from cron.scheduler import _build_job_prompt
job_a = create_job(prompt="Find data", schedule="every 1h")
out_dir = OUTPUT_DIR / job_a["id"]
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / "2026-04-22_10-00-00.md").write_text("Context data", encoding="utf-8")
job_b = create_job(
prompt="Process the data above",
schedule="every 2h",
context_from=job_a["id"],
)
prompt = _build_job_prompt(job_b)
context_pos = prompt.find("Context data")
prompt_pos = prompt.find("Process the data above")
assert context_pos < prompt_pos
def test_output_truncated_at_8k_chars(self, cron_env):
"""Output longer than 8000 chars should be truncated."""
from cron.jobs import create_job, OUTPUT_DIR
from cron.scheduler import _build_job_prompt
job_a = create_job(prompt="Find data", schedule="every 1h")
out_dir = OUTPUT_DIR / job_a["id"]
out_dir.mkdir(parents=True, exist_ok=True)
big_output = "x" * 10000
(out_dir / "2026-04-22_10-00-00.md").write_text(big_output, encoding="utf-8")
job_b = create_job(
prompt="Process", schedule="every 2h", context_from=job_a["id"]
)
prompt = _build_job_prompt(job_b)
assert "truncated" in prompt
assert "x" * 10000 not in prompt
def test_graceful_when_file_deleted_between_listing_and_reading(self, cron_env):
"""Job should not crash if output file is deleted mid-read."""
from cron.jobs import create_job, OUTPUT_DIR
from cron.scheduler import _build_job_prompt
from unittest.mock import patch
job_a = create_job(prompt="Find data", schedule="every 1h")
out_dir = OUTPUT_DIR / job_a["id"]
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / "2026-04-22_10-00-00.md").write_text("Some output", encoding="utf-8")
job_b = create_job(
prompt="Process", schedule="every 2h", context_from=job_a["id"]
)
# Simulate file deleted between glob() and read_text()
original_read = Path.read_text
def mock_read_text(self, *args, **kwargs):
if self.suffix == ".md":
raise FileNotFoundError("file deleted mid-read")
return original_read(self, *args, **kwargs)
with patch.object(Path, "read_text", mock_read_text):
prompt = _build_job_prompt(job_b)
# Job should not crash, prompt should still contain the base prompt
assert "Process" in prompt
def test_invalid_job_id_skipped(self, cron_env):
"""context_from with path traversal job_id should be skipped."""
from cron.jobs import create_job
from cron.scheduler import _build_job_prompt
job = create_job(prompt="Process", schedule="every 2h")
# Manually inject invalid context_from (simulating tampered jobs.json)
job["context_from"] = ["../../../etc/passwd"]
prompt = _build_job_prompt(job)
# Should not crash and should not inject anything malicious
assert "Process" in prompt
assert "etc/passwd" not in prompt

View file

@ -11,7 +11,7 @@ import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from hermes_constants import display_hermes_home
@ -238,6 +238,7 @@ def cronjob(
base_url: Optional[str] = None,
reason: Optional[str] = None,
script: Optional[str] = None,
context_from: Optional[Union[str, List[str]]] = None,
enabled_toolsets: Optional[List[str]] = None,
workdir: Optional[str] = None,
task_id: str = None,
@ -265,6 +266,18 @@ def cronjob(
if script_error:
return tool_error(script_error, success=False)
# Validate context_from references existing jobs
if context_from:
from cron.jobs import get_job as _get_job
refs = [context_from] if isinstance(context_from, str) else context_from
for ref_id in refs:
if not _get_job(ref_id):
return tool_error(
f"context_from job '{ref_id}' not found. "
"Use cronjob(action='list') to see available jobs.",
success=False,
)
job = create_job(
prompt=prompt or "",
schedule=schedule,
@ -277,6 +290,7 @@ def cronjob(
provider=_normalize_optional_job_value(provider),
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
script=_normalize_optional_job_value(script),
context_from=context_from,
enabled_toolsets=enabled_toolsets or None,
workdir=_normalize_optional_job_value(workdir),
)
@ -473,6 +487,19 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"type": "string",
"description": f"Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under {display_hermes_home()}/scripts/. On update, pass empty string to clear."
},
"context_from": {
"type": "array",
"items": {"type": "string"},
"description": (
"Optional job ID or list of job IDs whose most recent completed output is "
"injected into the prompt as context before each run. "
"Use this to chain cron jobs: job A collects data, job B processes it. "
"Each entry must be a valid job ID (from cronjob action='list'). "
"Note: injects the most recent completed output — does not wait for "
"upstream jobs running in the same tick. "
"On update, pass an empty array to clear."
),
},
"enabled_toolsets": {
"type": "array",
"items": {"type": "string"},
@ -481,6 +508,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"workdir": {
"type": "string",
"description": "Optional absolute path to run the job from. When set, AGENTS.md / CLAUDE.md / .cursorrules from that directory are injected into the system prompt, and the terminal/file/code_exec tools use it as their working directory — useful for running a job inside a specific project repo. Must be an absolute path that exists. When unset (default), preserves the original behaviour: no project context files, tools use the scheduler's cwd. On update, pass an empty string to clear. Jobs with workdir run sequentially (not parallel) to keep per-job directories isolated."
},
},
"required": ["action"]
@ -526,6 +554,7 @@ registry.register(
base_url=args.get("base_url"),
reason=args.get("reason"),
script=args.get("script"),
context_from=args.get("context_from"),
enabled_toolsets=args.get("enabled_toolsets"),
workdir=args.get("workdir"),
task_id=kw.get("task_id"),