diff --git a/cron/jobs.py b/cron/jobs.py index 158f53654a..c9a41ca2f5 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -16,7 +16,7 @@ import uuid from datetime import datetime, timedelta from pathlib import Path from hermes_constants import get_hermes_home -from typing import Optional, Dict, List, Any +from typing import Optional, Dict, List, Any, Union logger = logging.getLogger(__name__) @@ -417,6 +417,7 @@ def create_job( provider: Optional[str] = None, base_url: Optional[str] = None, script: Optional[str] = None, + context_from: Optional[Union[str, List[str]]] = None, enabled_toolsets: Optional[List[str]] = None, workdir: Optional[str] = None, ) -> Dict[str, Any]: @@ -438,6 +439,9 @@ def create_job( script: Optional path to a Python script whose stdout is injected into the prompt each run. The script runs before the agent turn, and its output is prepended as context. Useful for data collection / change detection. + context_from: Optional job ID (or list of job IDs) whose most recent output + is injected into the prompt as context before each run. + Useful for chaining cron jobs: job A finds data, job B processes it. enabled_toolsets: Optional list of toolset names to restrict the agent to. When set, only tools from these toolsets are loaded, reducing token overhead. When omitted, all default tools are loaded. @@ -481,6 +485,14 @@ def create_job( normalized_toolsets = normalized_toolsets or None normalized_workdir = _normalize_workdir(workdir) + # Normalize context_from: accept str or list of str, store as list or None + if isinstance(context_from, str): + context_from = [context_from.strip()] if context_from.strip() else None + elif isinstance(context_from, list): + context_from = [str(j).strip() for j in context_from if str(j).strip()] or None + else: + context_from = None + label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job" job = { "id": job_id, @@ -492,6 +504,7 @@ def create_job( "provider": normalized_provider, "base_url": normalized_base_url, "script": normalized_script, + "context_from": context_from, "schedule": parsed_schedule, "schedule_display": parsed_schedule.get("display", schedule), "repeat": { diff --git a/cron/scheduler.py b/cron/scheduler.py index 3dbb54c7d8..5924ba19e6 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -671,6 +671,61 @@ def _build_job_prompt(job: dict, prerun_script: Optional[tuple] = None) -> str: f"{prompt}" ) + # Inject output from referenced cron jobs as context. + context_from = job.get("context_from") + if context_from: + from cron.jobs import OUTPUT_DIR + if isinstance(context_from, str): + context_from = [context_from] + for source_job_id in context_from: + # Guard against path traversal — valid job IDs are 12-char hex strings + if not source_job_id or not all(c in "0123456789abcdef" for c in source_job_id): + logger.warning("context_from: skipping invalid job_id %r", source_job_id) + continue + try: + job_output_dir = OUTPUT_DIR / source_job_id + if not job_output_dir.exists(): + prompt = ( + f"[Context job '{source_job_id}' has no output yet.]\n\n" + f"{prompt}" + ) + continue + output_files = sorted( + job_output_dir.glob("*.md"), + key=lambda f: f.stat().st_mtime, + reverse=True, + ) + if not output_files: + prompt = ( + f"[Context job '{source_job_id}' has no output yet.]\n\n" + f"{prompt}" + ) + continue + latest_output = output_files[0].read_text(encoding="utf-8").strip() + # Truncate to 8K characters to avoid prompt bloat + _MAX_CONTEXT_CHARS = 8000 + if len(latest_output) > _MAX_CONTEXT_CHARS: + latest_output = latest_output[:_MAX_CONTEXT_CHARS] + "\n\n[... output truncated ...]" + if latest_output: + prompt = ( + f"## Output from job '{source_job_id}'\n" + "The following is the most recent output from a preceding " + "cron job. Use it as context for your analysis.\n\n" + f"```\n{latest_output}\n```\n\n" + f"{prompt}" + ) + else: + prompt = ( + f"[Context job '{source_job_id}' has no output yet.]\n\n" + f"{prompt}" + ) + except (OSError, PermissionError) as e: + logger.warning("context_from: failed to read output for job %r: %s", source_job_id, e) + prompt = ( + f"[Context job '{source_job_id}' output could not be read.]\n\n" + f"{prompt}" + ) + # Always prepend cron execution guidance so the agent knows how # delivery works and can suppress delivery when appropriate. cron_hint = ( diff --git a/tests/cron/test_cron_context_from.py b/tests/cron/test_cron_context_from.py new file mode 100644 index 0000000000..449a067898 --- /dev/null +++ b/tests/cron/test_cron_context_from.py @@ -0,0 +1,238 @@ +"""Tests for cron job context_from feature (issue #5439 Option C).""" + +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + +@pytest.fixture +def cron_env(tmp_path, monkeypatch): + """Isolated cron environment with temp HERMES_HOME.""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + (hermes_home / "cron").mkdir() + (hermes_home / "cron" / "output").mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + import cron.jobs as jobs_mod + monkeypatch.setattr(jobs_mod, "HERMES_DIR", hermes_home) + monkeypatch.setattr(jobs_mod, "CRON_DIR", hermes_home / "cron") + monkeypatch.setattr(jobs_mod, "JOBS_FILE", hermes_home / "cron" / "jobs.json") + monkeypatch.setattr(jobs_mod, "OUTPUT_DIR", hermes_home / "cron" / "output") + + return hermes_home + + +class TestJobContextFromField: + """Test that context_from is stored and retrieved correctly.""" + + def test_create_job_with_context_from_string(self, cron_env): + from cron.jobs import create_job, get_job + + job_a = create_job(prompt="Find news", schedule="every 1h") + job_b = create_job( + prompt="Summarize findings", + schedule="every 2h", + context_from=job_a["id"], + ) + + assert job_b["context_from"] == [job_a["id"]] + loaded = get_job(job_b["id"]) + assert loaded["context_from"] == [job_a["id"]] + + def test_create_job_with_context_from_list(self, cron_env): + from cron.jobs import create_job, get_job + + job_a = create_job(prompt="Find news", schedule="every 1h") + job_b = create_job(prompt="Find weather", schedule="every 1h") + job_c = create_job( + prompt="Summarize everything", + schedule="every 2h", + context_from=[job_a["id"], job_b["id"]], + ) + + assert job_c["context_from"] == [job_a["id"], job_b["id"]] + + def test_create_job_without_context_from(self, cron_env): + from cron.jobs import create_job + + job = create_job(prompt="Hello", schedule="every 1h") + assert job.get("context_from") is None + + def test_context_from_empty_string_normalized_to_none(self, cron_env): + from cron.jobs import create_job + + job = create_job(prompt="Hello", schedule="every 1h", context_from="") + assert job.get("context_from") is None + + def test_context_from_empty_list_normalized_to_none(self, cron_env): + from cron.jobs import create_job + + job = create_job(prompt="Hello", schedule="every 1h", context_from=[]) + assert job.get("context_from") is None + + +class TestBuildJobPromptContextFrom: + """Test that _build_job_prompt() injects context from referenced jobs.""" + + def test_injects_latest_output(self, cron_env): + from cron.jobs import create_job, OUTPUT_DIR + from cron.scheduler import _build_job_prompt + + job_a = create_job(prompt="Find news", schedule="every 1h") + + # Записываем output для job_a + output_dir = OUTPUT_DIR / job_a["id"] + output_dir.mkdir(parents=True, exist_ok=True) + (output_dir / "2026-04-22_10-00-00.md").write_text( + "Today's top story: AI is everywhere.", encoding="utf-8" + ) + + job_b = create_job( + prompt="Summarize the news", + schedule="every 2h", + context_from=job_a["id"], + ) + + prompt = _build_job_prompt(job_b) + assert "Today's top story: AI is everywhere." in prompt + assert f"Output from job '{job_a['id']}'" in prompt + + def test_uses_most_recent_output(self, cron_env): + from cron.jobs import create_job, OUTPUT_DIR + from cron.scheduler import _build_job_prompt + import time + + job_a = create_job(prompt="Find news", schedule="every 1h") + output_dir = OUTPUT_DIR / job_a["id"] + output_dir.mkdir(parents=True, exist_ok=True) + + old_file = output_dir / "2026-04-22_08-00-00.md" + old_file.write_text("Old output", encoding="utf-8") + time.sleep(0.01) + new_file = output_dir / "2026-04-22_10-00-00.md" + new_file.write_text("New output", encoding="utf-8") + + job_b = create_job( + prompt="Summarize", schedule="every 2h", context_from=job_a["id"] + ) + prompt = _build_job_prompt(job_b) + assert "New output" in prompt + assert "Old output" not in prompt + + def test_graceful_when_no_output_yet(self, cron_env): + from cron.jobs import create_job + from cron.scheduler import _build_job_prompt + + job_a = create_job(prompt="Find news", schedule="every 1h") + job_b = create_job( + prompt="Summarize", schedule="every 2h", context_from=job_a["id"] + ) + + # job_a ещё не запускался — output dir не существует + prompt = _build_job_prompt(job_b) + assert "no output yet" in prompt.lower() or "not found" in prompt.lower() + assert "Summarize" in prompt + + def test_injects_multiple_context_jobs(self, cron_env): + from cron.jobs import create_job, OUTPUT_DIR + from cron.scheduler import _build_job_prompt + + job_a = create_job(prompt="Find news", schedule="every 1h") + job_b = create_job(prompt="Find weather", schedule="every 1h") + + for job, content in [(job_a, "News: AI boom"), (job_b, "Weather: Sunny")]: + out_dir = OUTPUT_DIR / job["id"] + out_dir.mkdir(parents=True, exist_ok=True) + (out_dir / "2026-04-22_10-00-00.md").write_text(content, encoding="utf-8") + + job_c = create_job( + prompt="Daily briefing", + schedule="every 2h", + context_from=[job_a["id"], job_b["id"]], + ) + prompt = _build_job_prompt(job_c) + assert "News: AI boom" in prompt + assert "Weather: Sunny" in prompt + + def test_context_injected_before_prompt(self, cron_env): + """Context should appear before the job's own prompt.""" + from cron.jobs import create_job, OUTPUT_DIR + from cron.scheduler import _build_job_prompt + + job_a = create_job(prompt="Find data", schedule="every 1h") + out_dir = OUTPUT_DIR / job_a["id"] + out_dir.mkdir(parents=True, exist_ok=True) + (out_dir / "2026-04-22_10-00-00.md").write_text("Context data", encoding="utf-8") + + job_b = create_job( + prompt="Process the data above", + schedule="every 2h", + context_from=job_a["id"], + ) + prompt = _build_job_prompt(job_b) + context_pos = prompt.find("Context data") + prompt_pos = prompt.find("Process the data above") + assert context_pos < prompt_pos + + def test_output_truncated_at_8k_chars(self, cron_env): + """Output longer than 8000 chars should be truncated.""" + from cron.jobs import create_job, OUTPUT_DIR + from cron.scheduler import _build_job_prompt + + job_a = create_job(prompt="Find data", schedule="every 1h") + out_dir = OUTPUT_DIR / job_a["id"] + out_dir.mkdir(parents=True, exist_ok=True) + big_output = "x" * 10000 + (out_dir / "2026-04-22_10-00-00.md").write_text(big_output, encoding="utf-8") + + job_b = create_job( + prompt="Process", schedule="every 2h", context_from=job_a["id"] + ) + prompt = _build_job_prompt(job_b) + assert "truncated" in prompt + assert "x" * 10000 not in prompt + + def test_graceful_when_file_deleted_between_listing_and_reading(self, cron_env): + """Job should not crash if output file is deleted mid-read.""" + from cron.jobs import create_job, OUTPUT_DIR + from cron.scheduler import _build_job_prompt + from unittest.mock import patch + + job_a = create_job(prompt="Find data", schedule="every 1h") + out_dir = OUTPUT_DIR / job_a["id"] + out_dir.mkdir(parents=True, exist_ok=True) + (out_dir / "2026-04-22_10-00-00.md").write_text("Some output", encoding="utf-8") + + job_b = create_job( + prompt="Process", schedule="every 2h", context_from=job_a["id"] + ) + + # Simulate file deleted between glob() and read_text() + original_read = Path.read_text + def mock_read_text(self, *args, **kwargs): + if self.suffix == ".md": + raise FileNotFoundError("file deleted mid-read") + return original_read(self, *args, **kwargs) + + with patch.object(Path, "read_text", mock_read_text): + prompt = _build_job_prompt(job_b) + + # Job should not crash, prompt should still contain the base prompt + assert "Process" in prompt + + def test_invalid_job_id_skipped(self, cron_env): + """context_from with path traversal job_id should be skipped.""" + from cron.jobs import create_job + from cron.scheduler import _build_job_prompt + + job = create_job(prompt="Process", schedule="every 2h") + # Manually inject invalid context_from (simulating tampered jobs.json) + job["context_from"] = ["../../../etc/passwd"] + prompt = _build_job_prompt(job) + # Should not crash and should not inject anything malicious + assert "Process" in prompt + assert "etc/passwd" not in prompt diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 88a28611ef..2d9485f2b0 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -11,7 +11,7 @@ import os import re import sys from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from hermes_constants import display_hermes_home @@ -238,6 +238,7 @@ def cronjob( base_url: Optional[str] = None, reason: Optional[str] = None, script: Optional[str] = None, + context_from: Optional[Union[str, List[str]]] = None, enabled_toolsets: Optional[List[str]] = None, workdir: Optional[str] = None, task_id: str = None, @@ -265,6 +266,18 @@ def cronjob( if script_error: return tool_error(script_error, success=False) + # Validate context_from references existing jobs + if context_from: + from cron.jobs import get_job as _get_job + refs = [context_from] if isinstance(context_from, str) else context_from + for ref_id in refs: + if not _get_job(ref_id): + return tool_error( + f"context_from job '{ref_id}' not found. " + "Use cronjob(action='list') to see available jobs.", + success=False, + ) + job = create_job( prompt=prompt or "", schedule=schedule, @@ -277,6 +290,7 @@ def cronjob( provider=_normalize_optional_job_value(provider), base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True), script=_normalize_optional_job_value(script), + context_from=context_from, enabled_toolsets=enabled_toolsets or None, workdir=_normalize_optional_job_value(workdir), ) @@ -473,6 +487,19 @@ Important safety rule: cron-run sessions should not recursively schedule more cr "type": "string", "description": f"Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under {display_hermes_home()}/scripts/. On update, pass empty string to clear." }, + "context_from": { + "type": "array", + "items": {"type": "string"}, + "description": ( + "Optional job ID or list of job IDs whose most recent completed output is " + "injected into the prompt as context before each run. " + "Use this to chain cron jobs: job A collects data, job B processes it. " + "Each entry must be a valid job ID (from cronjob action='list'). " + "Note: injects the most recent completed output — does not wait for " + "upstream jobs running in the same tick. " + "On update, pass an empty array to clear." + ), + }, "enabled_toolsets": { "type": "array", "items": {"type": "string"}, @@ -481,6 +508,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr "workdir": { "type": "string", "description": "Optional absolute path to run the job from. When set, AGENTS.md / CLAUDE.md / .cursorrules from that directory are injected into the system prompt, and the terminal/file/code_exec tools use it as their working directory — useful for running a job inside a specific project repo. Must be an absolute path that exists. When unset (default), preserves the original behaviour: no project context files, tools use the scheduler's cwd. On update, pass an empty string to clear. Jobs with workdir run sequentially (not parallel) to keep per-job directories isolated." + }, }, "required": ["action"] @@ -526,6 +554,7 @@ registry.register( base_url=args.get("base_url"), reason=args.get("reason"), script=args.get("script"), + context_from=args.get("context_from"), enabled_toolsets=args.get("enabled_toolsets"), workdir=args.get("workdir"), task_id=kw.get("task_id"),