Merge pull request #50025 from NousResearch/salvage/cron-run-immediate

fix(cron): execute job immediately on action=run
This commit is contained in:
kshitij 2026-06-21 13:53:13 +05:30 committed by GitHub
commit f6a504d088
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 154 additions and 4 deletions

View file

@ -353,7 +353,14 @@ def _job_action(action: str, job_id: str, success_verb: str) -> int:
if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"):
print(f" Next run: {result['job']['next_run_at']}")
if action == "run":
print(" It will run on the next scheduler tick.")
job = result.get("job", {})
if job.get("executed"):
outcome = "succeeded" if job.get("execution_success") else "failed"
print(f" Ran now: {outcome}.")
elif job.get("execution_skipped"):
print(f" {job['execution_skipped']}")
else:
print(" It will run on the next scheduler tick.")
return 0

View file

@ -0,0 +1,81 @@
"""Tests for cronjob action='run' immediate execution (#41037).
Before this fix, `cronjob(action='run')` only set next_run_at=now and returned
success, relying on the scheduler ticker to actually run the job. With no
gateway/ticker active (e.g. a CLI-only Windows setup) the job never executed and
last_run_at stayed null forever. Now action='run' claims the job (at-most-once,
blocking a concurrent tick) and fires it inline via the shared run_one_job body.
"""
import json
from unittest.mock import patch
from tools.cronjob_tools import cronjob, _execute_job_now
_JOB = {"id": "job-run-1", "name": "manual run", "prompt": "hi",
"schedule": {"kind": "cron", "expr": "0 9 * * *"}}
class TestCronjobRunExecutesImmediately:
def test_run_action_claims_and_fires_via_run_one_job(self):
"""action='run' must claim the job then fire it through run_one_job."""
ran = {"job": "after-run", "last_status": "ok", "last_error": None}
with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \
patch("tools.cronjob_tools.claim_job_for_fire", return_value=True) as m_claim, \
patch("cron.scheduler.run_one_job", return_value=True) as m_run, \
patch("tools.cronjob_tools.get_job", return_value=ran):
out = json.loads(cronjob(action="run", job_id="job-run-1"))
assert out["success"] is True
assert out["job"]["executed"] is True
assert out["job"]["execution_success"] is True
m_claim.assert_called_once_with("job-run-1") # at-most-once claim taken
m_run.assert_called_once() # fired via the shared body
def test_run_skips_when_claim_lost(self):
"""If the scheduler already holds the fire claim, do NOT double-run."""
with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \
patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \
patch("cron.scheduler.run_one_job") as m_run, \
patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)):
out = json.loads(cronjob(action="run", job_id="job-run-1"))
assert out["success"] is True
assert out["job"]["executed"] is False
assert out["job"]["execution_success"] is False
assert "execution_skipped" in out["job"]
m_run.assert_not_called() # claim lost -> never fired
def test_run_reports_failure_from_last_status(self):
"""A failed run is reported via the re-read job's last_status/last_error."""
failed = {"id": "job-run-1", "last_status": "error", "last_error": "provider 500"}
with patch("tools.cronjob_tools.resolve_job_ref", return_value=dict(_JOB)), \
patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \
patch("cron.scheduler.run_one_job", return_value=True), \
patch("tools.cronjob_tools.get_job", return_value=failed):
out = json.loads(cronjob(action="run", job_id="job-run-1"))
assert out["job"]["executed"] is True
assert out["job"]["execution_success"] is False
assert out["job"]["execution_error"] == "provider 500"
def test_execute_job_now_bails_without_claim(self):
"""_execute_job_now never calls run_one_job when the claim is lost."""
with patch("tools.cronjob_tools.claim_job_for_fire", return_value=False), \
patch("cron.scheduler.run_one_job") as m_run:
res = _execute_job_now(dict(_JOB))
assert res["claimed"] is False
assert res["success"] is False
m_run.assert_not_called()
def test_execute_job_now_marks_failure_on_exception(self):
"""An exception during fire is captured, marked failed, not propagated."""
with patch("tools.cronjob_tools.claim_job_for_fire", return_value=True), \
patch("cron.scheduler.run_one_job", side_effect=RuntimeError("boom")), \
patch("tools.cronjob_tools.mark_job_run") as m_mark, \
patch("tools.cronjob_tools.get_job", return_value=dict(_JOB)):
res = _execute_job_now(dict(_JOB))
assert res["claimed"] is True
assert res["success"] is False
assert "boom" in res["error"]
m_mark.assert_called_once()

View file

@ -21,14 +21,16 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from cron.jobs import (
AmbiguousJobReference,
claim_job_for_fire,
create_job,
get_job,
list_jobs,
mark_job_run,
parse_schedule,
pause_job,
remove_job,
resolve_job_ref,
resume_job,
trigger_job,
update_job,
)
@ -472,6 +474,51 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
return result
def _execute_job_now(job: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a cron job immediately, outside the scheduler tick.
Atomically claims the job first via ``claim_job_for_fire`` the same
at-most-once CAS the scheduler/external-provider fire path uses so a
concurrently-running gateway ticker cannot also fire it (the claim both
blocks a duplicate fire and advances ``next_run_at`` for recurring jobs).
If the claim is lost (another fire is in flight), this is a no-op.
The actual firing is delegated to ``run_one_job`` the single shared
executesavedelivermark body the ticker and external providers use so
failure delivery, ``[SILENT]`` handling, and live-adapter delivery stay
identical across paths and can't drift.
Returns {"claimed": bool, "success": bool, "error": str|None}.
"""
job_id = job["id"]
try:
from cron.scheduler import run_one_job
# At-most-once claim: bail without running if a tick/other fire owns it.
if not claim_job_for_fire(job_id):
return {"claimed": False, "success": False,
"error": "Job is already being fired by the scheduler; not run again."}
# run_one_job records last_run_at/last_status via mark_job_run (which
# also clears the fire claim) and returns True iff it processed the job.
processed = run_one_job(job)
refreshed = get_job(job_id) or {}
ok = refreshed.get("last_status") == "ok"
return {
"claimed": True,
"success": bool(processed and ok),
"error": refreshed.get("last_error"),
}
except Exception as e:
logger.error("Failed to execute cron job %s immediately: %s", job_id, e)
try:
mark_job_run(job_id, False, str(e))
except Exception:
pass
return {"claimed": True, "success": False, "error": str(e)}
def cronjob(
action: str,
job_id: Optional[str] = None,
@ -640,8 +687,23 @@ def cronjob(
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized in {"run", "run_now", "trigger"}:
updated = trigger_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
# Execute the job immediately rather than only scheduling it for the
# next scheduler tick — a manual `run` should actually run, even when
# no gateway/ticker is active (the #41037 case). The claim inside
# _execute_job_now advances next_run_at and blocks a concurrent tick
# from double-firing.
exec_result = _execute_job_now(job)
# Re-read so the response reflects the post-run last_run_at/last_status.
result = _format_job(get_job(job_id) or {"id": job_id})
result["executed"] = exec_result.get("claimed", False)
result["execution_success"] = exec_result.get("success", False)
if not exec_result.get("claimed", False):
result["execution_skipped"] = (
"Already being fired by the scheduler; not run again."
)
elif exec_result.get("error"):
result["execution_error"] = exec_result["error"]
return json.dumps({"success": True, "job": result}, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}