fix(cron): execute job immediately on action='run'

`cronjob(action='run')` (and `hermes cron run`) only set `next_run_at = now`
and returned success, relying on the scheduler ticker to actually execute the
job on its next tick. When no gateway/ticker is running — a CLI-only setup, or
the Windows case in #41037 — the job never executed: `run` reported success,
but `last_run_at` stayed null forever, no output, no delivery.

A manual `run` should actually run. `_execute_job_now` now:

- **claims the job via `claim_job_for_fire`** — the same at-most-once CAS the
  scheduler/external-provider fire path uses. This both advances `next_run_at`
  for recurring jobs and blocks a concurrently-running gateway ticker from
  double-firing the same job; if the claim is lost, the run is skipped (the
  tool reports `execution_skipped`). This closes the double-fire race that a
  bare `advance_next_run` left open (a tick whose `get_due_jobs` already
  captured the job between trigger and advance would still fire it).
- **delegates firing to `run_one_job`** — the single shared
  execute→save→deliver→mark body the ticker and external providers use — so
  failure delivery, `[SILENT]` handling, and live-adapter delivery stay
  identical across paths and can't drift. (The original salvage re-implemented
  this sequence inline and had already dropped failure delivery + `[SILENT]`.)

The tool response carries `executed`, `execution_success`, and either
`execution_error` or `execution_skipped`. The `hermes cron run` CLI message no
longer claims "It will run on the next scheduler tick" — it reports the actual
"Ran now: succeeded/failed" outcome (or the skip).

Salvaged from #41130 by @kyssta-exe (authorship preserved); reworked to reuse
`claim_job_for_fire` + `run_one_job` per review rather than re-implementing the
fire sequence inline. Adds tests for the claim-then-fire path, claim-lost skip,
failure reporting, and exception capture.

Fixes #41037

Co-authored-by: kyssta-exe <kyssta-exe@users.noreply.github.com>
This commit is contained in:
kyssta-exe 2026-06-21 13:10:54 +05:30
parent 35752fc3a5
commit 65d7c7fafd
3 changed files with 154 additions and 4 deletions

View file

@ -313,7 +313,14 @@ def _job_action(action: str, job_id: str, success_verb: str) -> int:
if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"):
print(f" Next run: {result['job']['next_run_at']}")
if action == "run":
print(" It will run on the next scheduler tick.")
job = result.get("job", {})
if job.get("executed"):
outcome = "succeeded" if job.get("execution_success") else "failed"
print(f" Ran now: {outcome}.")
elif job.get("execution_skipped"):
print(f" {job['execution_skipped']}")
else:
print(" It will run on the next scheduler tick.")
return 0

View file

@ -0,0 +1,81 @@
"""Tests for cronjob action='run' immediate execution (#41037).
Before this fix, `cronjob(action='run')` only set next_run_at=now and returned
success, relying on the scheduler ticker to actually run the job. With no
gateway/ticker active (e.g. a CLI-only Windows setup) the job never executed and
last_run_at stayed null forever. Now action='run' claims the job (at-most-once,
blocking a concurrent tick) and fires it inline via the shared run_one_job body.
"""
import json
from unittest.mock import patch
from tools.cronjob_tools import cronjob, _execute_job_now
_JOB = {"id": "job-run-1", "name": "manual run", "prompt": "hi",
"schedule": {"kind": "cron", "expr": "0 9 * * *"}}
class TestCronjobRunExecutesImmediately:
def test_run_action_claims_and_fires_via_run_one_job(self):
"""action='run' must claim the job then fire it through run_one_job."""
ran = {"job": "after-run", "last_status": "ok", "last_error": None}
with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \
patch("tools.cronjob_tools.claim_job_for_fire", return_value=True) as m_claim, \
patch("cron.scheduler.run_one_job", return_value=True) as m_run, \
patch("tools.cronjob_tools.get_job", return_value=ran):
out = json.loads(cronjob(action="run", job_id="job-run-1"))
assert out["success"] is True
assert out["job"]["executed"] is True
assert out["job"]["execution_success"] is True
m_claim.assert_called_once_with("job-run-1") # at-most-once claim taken
m_run.assert_called_once() # fired via the shared body
def test_run_skips_when_claim_lost(self):
"""If the scheduler already holds the fire claim, do NOT double-run."""
with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \
patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \
patch("cron.scheduler.run_one_job") as m_run, \
patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)):
out = json.loads(cronjob(action="run", job_id="job-run-1"))
assert out["success"] is True
assert out["job"]["executed"] is False
assert out["job"]["execution_success"] is False
assert "execution_skipped" in out["job"]
m_run.assert_not_called() # claim lost -> never fired
def test_run_reports_failure_from_last_status(self):
"""A failed run is reported via the re-read job's last_status/last_error."""
failed = {"id": "job-run-1", "last_status": "error", "last_error": "provider 500"}
with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \
patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \
patch("cron.scheduler.run_one_job", return_value=True), \
patch("tools.cronjob_tools.get_job", return_value=failed):
out = json.loads(cronjob(action="run", job_id="job-run-1"))
assert out["job"]["executed"] is True
assert out["job"]["execution_success"] is False
assert out["job"]["execution_error"] == "provider 500"
def test_execute_job_now_bails_without_claim(self):
"""_execute_job_now never calls run_one_job when the claim is lost."""
with patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \
patch("cron.scheduler.run_one_job") as m_run:
res = _execute_job_now(dict(_JOB))
assert res["claimed"] is False
assert res["success"] is False
m_run.assert_not_called()
def test_execute_job_now_marks_failure_on_exception(self):
"""An exception during fire is captured, marked failed, not propagated."""
with patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \
patch("cron.scheduler.run_one_job", side_effect=RuntimeError("boom")), \
patch("tools.cronjob_tools.mark_job_run") as m_mark, \
patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)):
res = _execute_job_now(dict(_JOB))
assert res["claimed"] is True
assert res["success"] is False
assert "boom" in res["error"]
m_mark.assert_called_once()

View file

@ -21,14 +21,16 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from cron.jobs import (
AmbiguousJobReference,
claim_job_for_fire,
create_job,
get_job,
list_jobs,
mark_job_run,
parse_schedule,
pause_job,
remove_job,
resolve_job_ref,
resume_job,
trigger_job,
update_job,
)
@ -472,6 +474,51 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
return result
def _execute_job_now(job: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a cron job immediately, outside the scheduler tick.
Atomically claims the job first via ``claim_job_for_fire`` the same
at-most-once CAS the scheduler/external-provider fire path uses so a
concurrently-running gateway ticker cannot also fire it (the claim both
blocks a duplicate fire and advances ``next_run_at`` for recurring jobs).
If the claim is lost (another fire is in flight), this is a no-op.
The actual firing is delegated to ``run_one_job`` the single shared
executesavedelivermark body the ticker and external providers use so
failure delivery, ``[SILENT]`` handling, and live-adapter delivery stay
identical across paths and can't drift.
Returns {"claimed": bool, "success": bool, "error": str|None}.
"""
job_id = job["id"]
try:
from cron.scheduler import run_one_job
# At-most-once claim: bail without running if a tick/other fire owns it.
if not claim_job_for_fire(job_id):
return {"claimed": False, "success": False,
"error": "Job is already being fired by the scheduler; not run again."}
# run_one_job records last_run_at/last_status via mark_job_run (which
# also clears the fire claim) and returns True iff it processed the job.
processed = run_one_job(job)
refreshed = get_job(job_id) or {}
ok = refreshed.get("last_status") == "ok"
return {
"claimed": True,
"success": bool(processed and ok),
"error": refreshed.get("last_error"),
}
except Exception as e:
logger.error("Failed to execute cron job %s immediately: %s", job_id, e)
try:
mark_job_run(job_id, False, str(e))
except Exception:
pass
return {"claimed": True, "success": False, "error": str(e)}
def cronjob(
action: str,
job_id: Optional[str] = None,
@ -640,8 +687,23 @@ def cronjob(
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized in {"run", "run_now", "trigger"}:
updated = trigger_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
# Execute the job immediately rather than only scheduling it for the
# next scheduler tick — a manual `run` should actually run, even when
# no gateway/ticker is active (the #41037 case). The claim inside
# _execute_job_now advances next_run_at and blocks a concurrent tick
# from double-firing.
exec_result = _execute_job_now(job)
# Re-read so the response reflects the post-run last_run_at/last_status.
result = _format_job(get_job(job_id) or {"id": job_id})
result["executed"] = exec_result.get("claimed", False)
result["execution_success"] = exec_result.get("success", False)
if not exec_result.get("claimed", False):
result["execution_skipped"] = (
"Already being fired by the scheduler; not run again."
)
elif exec_result.get("error"):
result["execution_error"] = exec_result["error"]
return json.dumps({"success": True, "job": result}, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}