mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-11 08:42:11 +00:00
Fully removes the cron per-job 'profile' arg added in #28124: the cronjob tool schema field, CLI --profile flags on cron create/edit, job-record storage/validation, the scheduler's _job_profile_context wrapper, and the script-runner env override. Sequential-partition logic reverts to workdir-only. The context-local HERMES_HOME override in hermes_constants and the subprocess bridging in tools/environments/local.py are kept — they now have other consumers (dashboard multi-profile, TUI gateway).
123 lines
4 KiB
Python
123 lines
4 KiB
Python
"""Tests for hermes_cli.cron command handling."""
|
|
|
|
from argparse import Namespace
|
|
|
|
import pytest
|
|
|
|
from cron.jobs import create_job, get_job, list_jobs
|
|
from hermes_cli.cron import cron_command
|
|
|
|
|
|
@pytest.fixture()
|
|
def tmp_cron_dir(tmp_path, monkeypatch):
|
|
monkeypatch.setattr("cron.jobs.CRON_DIR", tmp_path / "cron")
|
|
monkeypatch.setattr("cron.jobs.JOBS_FILE", tmp_path / "cron" / "jobs.json")
|
|
monkeypatch.setattr("cron.jobs.OUTPUT_DIR", tmp_path / "cron" / "output")
|
|
return tmp_path
|
|
|
|
|
|
class TestCronCommandLifecycle:
|
|
def test_pause_resume_run(self, tmp_cron_dir, capsys):
|
|
job = create_job(prompt="Check server status", schedule="every 1h")
|
|
|
|
cron_command(Namespace(cron_command="pause", job_id=job["id"]))
|
|
paused = get_job(job["id"])
|
|
assert paused["state"] == "paused"
|
|
|
|
cron_command(Namespace(cron_command="resume", job_id=job["id"]))
|
|
resumed = get_job(job["id"])
|
|
assert resumed["state"] == "scheduled"
|
|
|
|
cron_command(Namespace(cron_command="run", job_id=job["id"]))
|
|
triggered = get_job(job["id"])
|
|
assert triggered["state"] == "scheduled"
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Paused job" in out
|
|
assert "Resumed job" in out
|
|
assert "Triggered job" in out
|
|
|
|
def test_edit_can_replace_and_clear_skills(self, tmp_cron_dir, capsys):
|
|
job = create_job(
|
|
prompt="Combine skill outputs",
|
|
schedule="every 1h",
|
|
skill="blogwatcher",
|
|
)
|
|
|
|
cron_command(
|
|
Namespace(
|
|
cron_command="edit",
|
|
job_id=job["id"],
|
|
schedule="every 2h",
|
|
prompt="Revised prompt",
|
|
name="Edited Job",
|
|
deliver=None,
|
|
repeat=None,
|
|
skill=None,
|
|
skills=["maps", "blogwatcher"],
|
|
clear_skills=False,
|
|
)
|
|
)
|
|
updated = get_job(job["id"])
|
|
assert updated["skills"] == ["maps", "blogwatcher"]
|
|
assert updated["name"] == "Edited Job"
|
|
assert updated["prompt"] == "Revised prompt"
|
|
assert updated["schedule_display"] == "every 120m"
|
|
|
|
cron_command(
|
|
Namespace(
|
|
cron_command="edit",
|
|
job_id=job["id"],
|
|
schedule=None,
|
|
prompt=None,
|
|
name=None,
|
|
deliver=None,
|
|
repeat=None,
|
|
skill=None,
|
|
skills=None,
|
|
clear_skills=True,
|
|
)
|
|
)
|
|
cleared = get_job(job["id"])
|
|
assert cleared["skills"] == []
|
|
assert cleared["skill"] is None
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Updated job" in out
|
|
|
|
def test_create_with_multiple_skills(self, tmp_cron_dir, capsys):
|
|
cron_command(
|
|
Namespace(
|
|
cron_command="create",
|
|
schedule="every 1h",
|
|
prompt="Use both skills",
|
|
name="Skill combo",
|
|
deliver=None,
|
|
repeat=None,
|
|
skill=None,
|
|
skills=["blogwatcher", "maps"],
|
|
)
|
|
)
|
|
out = capsys.readouterr().out
|
|
assert "Created job" in out
|
|
|
|
jobs = list_jobs()
|
|
assert len(jobs) == 1
|
|
assert jobs[0]["skills"] == ["blogwatcher", "maps"]
|
|
assert jobs[0]["name"] == "Skill combo"
|
|
|
|
def test_list_does_not_crash_when_repeat_is_null(self, tmp_cron_dir, capsys):
|
|
"""A one-shot job can be persisted with ``"repeat": null``. `cron
|
|
list` must render it as ∞ rather than crashing on .get(...)\\.get."""
|
|
from cron.jobs import load_jobs, save_jobs
|
|
|
|
create_job(prompt="One shot", schedule="every 1h")
|
|
# Force the present-but-null shape that .get("repeat", {}) mishandles.
|
|
jobs = load_jobs()
|
|
jobs[0]["repeat"] = None
|
|
save_jobs(jobs)
|
|
|
|
cron_command(Namespace(cron_command="list", all=True))
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Repeat: ∞" in out
|