From 65d7c7fafdf1719fc71ea35466b5c42a6ab1bf15 Mon Sep 17 00:00:00 2001 From: kyssta-exe Date: Sun, 21 Jun 2026 13:10:54 +0530 Subject: [PATCH] fix(cron): execute job immediately on action='run' MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `cronjob(action='run')` (and `hermes cron run`) only set `next_run_at = now` and returned success, relying on the scheduler ticker to actually execute the job on its next tick. When no gateway/ticker is running — a CLI-only setup, or the Windows case in #41037 — the job never executed: `run` reported success, but `last_run_at` stayed null forever, no output, no delivery. A manual `run` should actually run. `_execute_job_now` now: - **claims the job via `claim_job_for_fire`** — the same at-most-once CAS the scheduler/external-provider fire path uses. This both advances `next_run_at` for recurring jobs and blocks a concurrently-running gateway ticker from double-firing the same job; if the claim is lost, the run is skipped (the tool reports `execution_skipped`). This closes the double-fire race that a bare `advance_next_run` left open (a tick whose `get_due_jobs` already captured the job between trigger and advance would still fire it). - **delegates firing to `run_one_job`** — the single shared execute→save→deliver→mark body the ticker and external providers use — so failure delivery, `[SILENT]` handling, and live-adapter delivery stay identical across paths and can't drift. (The original salvage re-implemented this sequence inline and had already dropped failure delivery + `[SILENT]`.) The tool response carries `executed`, `execution_success`, and either `execution_error` or `execution_skipped`. The `hermes cron run` CLI message no longer claims "It will run on the next scheduler tick" — it reports the actual "Ran now: succeeded/failed" outcome (or the skip). Salvaged from #41130 by @kyssta-exe (authorship preserved); reworked to reuse `claim_job_for_fire` + `run_one_job` per review rather than re-implementing the fire sequence inline. Adds tests for the claim-then-fire path, claim-lost skip, failure reporting, and exception capture. Fixes #41037 Co-authored-by: kyssta-exe --- hermes_cli/cron.py | 9 ++- tests/tools/test_cronjob_run_immediate.py | 81 +++++++++++++++++++++++ tools/cronjob_tools.py | 68 ++++++++++++++++++- 3 files changed, 154 insertions(+), 4 deletions(-) create mode 100644 tests/tools/test_cronjob_run_immediate.py diff --git a/hermes_cli/cron.py b/hermes_cli/cron.py index 86f8e6b09e2..6be6236ea45 100644 --- a/hermes_cli/cron.py +++ b/hermes_cli/cron.py @@ -313,7 +313,14 @@ def _job_action(action: str, job_id: str, success_verb: str) -> int: if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"): print(f" Next run: {result['job']['next_run_at']}") if action == "run": - print(" It will run on the next scheduler tick.") + job = result.get("job", {}) + if job.get("executed"): + outcome = "succeeded" if job.get("execution_success") else "failed" + print(f" Ran now: {outcome}.") + elif job.get("execution_skipped"): + print(f" {job['execution_skipped']}") + else: + print(" It will run on the next scheduler tick.") return 0 diff --git a/tests/tools/test_cronjob_run_immediate.py b/tests/tools/test_cronjob_run_immediate.py new file mode 100644 index 00000000000..9efa60e82cb --- /dev/null +++ b/tests/tools/test_cronjob_run_immediate.py @@ -0,0 +1,81 @@ +"""Tests for cronjob action='run' immediate execution (#41037). + +Before this fix, `cronjob(action='run')` only set next_run_at=now and returned +success, relying on the scheduler ticker to actually run the job. With no +gateway/ticker active (e.g. a CLI-only Windows setup) the job never executed and +last_run_at stayed null forever. Now action='run' claims the job (at-most-once, +blocking a concurrent tick) and fires it inline via the shared run_one_job body. +""" +import json +from unittest.mock import patch + +from tools.cronjob_tools import cronjob, _execute_job_now + + +_JOB = {"id": "job-run-1", "name": "manual run", "prompt": "hi", + "schedule": {"kind": "cron", "expr": "0 9 * * *"}} + + +class TestCronjobRunExecutesImmediately: + def test_run_action_claims_and_fires_via_run_one_job(self): + """action='run' must claim the job then fire it through run_one_job.""" + ran = {"job": "after-run", "last_status": "ok", "last_error": None} + with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \ + patch("tools.cronjob_tools.claim_job_for_fire", return_value=True) as m_claim, \ + patch("cron.scheduler.run_one_job", return_value=True) as m_run, \ + patch("tools.cronjob_tools.get_job", return_value=ran): + out = json.loads(cronjob(action="run", job_id="job-run-1")) + + assert out["success"] is True + assert out["job"]["executed"] is True + assert out["job"]["execution_success"] is True + m_claim.assert_called_once_with("job-run-1") # at-most-once claim taken + m_run.assert_called_once() # fired via the shared body + + def test_run_skips_when_claim_lost(self): + """If the scheduler already holds the fire claim, do NOT double-run.""" + with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \ + patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \ + patch("cron.scheduler.run_one_job") as m_run, \ + patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)): + out = json.loads(cronjob(action="run", job_id="job-run-1")) + + assert out["success"] is True + assert out["job"]["executed"] is False + assert out["job"]["execution_success"] is False + assert "execution_skipped" in out["job"] + m_run.assert_not_called() # claim lost -> never fired + + def test_run_reports_failure_from_last_status(self): + """A failed run is reported via the re-read job's last_status/last_error.""" + failed = {"id": "job-run-1", "last_status": "error", "last_error": "provider 500"} + with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \ + patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \ + patch("cron.scheduler.run_one_job", return_value=True), \ + patch("tools.cronjob_tools.get_job", return_value=failed): + out = json.loads(cronjob(action="run", job_id="job-run-1")) + + assert out["job"]["executed"] is True + assert out["job"]["execution_success"] is False + assert out["job"]["execution_error"] == "provider 500" + + def test_execute_job_now_bails_without_claim(self): + """_execute_job_now never calls run_one_job when the claim is lost.""" + with patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \ + patch("cron.scheduler.run_one_job") as m_run: + res = _execute_job_now(dict(_JOB)) + assert res["claimed"] is False + assert res["success"] is False + m_run.assert_not_called() + + def test_execute_job_now_marks_failure_on_exception(self): + """An exception during fire is captured, marked failed, not propagated.""" + with patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \ + patch("cron.scheduler.run_one_job", side_effect=RuntimeError("boom")), \ + patch("tools.cronjob_tools.mark_job_run") as m_mark, \ + patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)): + res = _execute_job_now(dict(_JOB)) + assert res["claimed"] is True + assert res["success"] is False + assert "boom" in res["error"] + m_mark.assert_called_once() diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 0bd62b2fc37..3339b823941 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -21,14 +21,16 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from cron.jobs import ( AmbiguousJobReference, + claim_job_for_fire, create_job, + get_job, list_jobs, + mark_job_run, parse_schedule, pause_job, remove_job, resolve_job_ref, resume_job, - trigger_job, update_job, ) @@ -472,6 +474,51 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: return result +def _execute_job_now(job: Dict[str, Any]) -> Dict[str, Any]: + """Execute a cron job immediately, outside the scheduler tick. + + Atomically claims the job first via ``claim_job_for_fire`` — the same + at-most-once CAS the scheduler/external-provider fire path uses — so a + concurrently-running gateway ticker cannot also fire it (the claim both + blocks a duplicate fire and advances ``next_run_at`` for recurring jobs). + If the claim is lost (another fire is in flight), this is a no-op. + + The actual firing is delegated to ``run_one_job`` — the single shared + execute→save→deliver→mark body the ticker and external providers use — so + failure delivery, ``[SILENT]`` handling, and live-adapter delivery stay + identical across paths and can't drift. + + Returns {"claimed": bool, "success": bool, "error": str|None}. + """ + job_id = job["id"] + try: + from cron.scheduler import run_one_job + + # At-most-once claim: bail without running if a tick/other fire owns it. + if not claim_job_for_fire(job_id): + return {"claimed": False, "success": False, + "error": "Job is already being fired by the scheduler; not run again."} + + # run_one_job records last_run_at/last_status via mark_job_run (which + # also clears the fire claim) and returns True iff it processed the job. + processed = run_one_job(job) + refreshed = get_job(job_id) or {} + ok = refreshed.get("last_status") == "ok" + return { + "claimed": True, + "success": bool(processed and ok), + "error": refreshed.get("last_error"), + } + + except Exception as e: + logger.error("Failed to execute cron job %s immediately: %s", job_id, e) + try: + mark_job_run(job_id, False, str(e)) + except Exception: + pass + return {"claimed": True, "success": False, "error": str(e)} + + def cronjob( action: str, job_id: Optional[str] = None, @@ -640,8 +687,23 @@ def cronjob( return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) if normalized in {"run", "run_now", "trigger"}: - updated = trigger_job(job_id) - return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + # Execute the job immediately rather than only scheduling it for the + # next scheduler tick — a manual `run` should actually run, even when + # no gateway/ticker is active (the #41037 case). The claim inside + # _execute_job_now advances next_run_at and blocks a concurrent tick + # from double-firing. + exec_result = _execute_job_now(job) + # Re-read so the response reflects the post-run last_run_at/last_status. + result = _format_job(get_job(job_id) or {"id": job_id}) + result["executed"] = exec_result.get("claimed", False) + result["execution_success"] = exec_result.get("success", False) + if not exec_result.get("claimed", False): + result["execution_skipped"] = ( + "Already being fired by the scheduler; not run again." + ) + elif exec_result.get("error"): + result["execution_error"] = exec_result["error"] + return json.dumps({"success": True, "job": result}, indent=2) if normalized == "update": updates: Dict[str, Any] = {}