Merge pull request #50062 from NousResearch/salvage/cron-missed-grace-runonce

fix(cron): run missed-grace jobs once instead of deferring forever
This commit is contained in:
kshitij 2026-06-22 15:50:54 +05:30 committed by GitHub
commit 2649f7360c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 87 additions and 17 deletions

View file

@ -1240,10 +1240,16 @@ def claim_job_for_fire(job_id: str, *, claim_ttl_seconds: int = 300) -> bool:
def get_due_jobs() -> List[Dict[str, Any]]:
"""Get all jobs that are due to run now.
For recurring jobs (cron/interval), if the scheduled time is stale
(more than one period in the past, e.g. because the gateway was down),
the job is fast-forwarded to the next future run instead of firing
immediately. This prevents a burst of missed jobs on gateway restart.
For recurring jobs (cron/interval), if the scheduled time is stale (more
than one period in the past, e.g. because the gateway was down OR because a
long-running previous execution overran the interval), the accumulated
missed runs are collapsed ``next_run_at`` is fast-forwarded to the next
future occurrence so a backlog does NOT burst-fire on restart but the job
still fires ONCE now. This prevents the perpetual-defer loop (#33315) where
a job whose runtime exceeds ``interval + grace`` would be skipped forever.
Note: firing once on catch-up flows through ``mark_job_run``, so a job with
a ``repeat.times`` limit consumes one of its runs on that catch-up fire.
"""
with _jobs_lock():
return _get_due_jobs_locked()
@ -1351,25 +1357,34 @@ def _get_due_jobs_locked() -> List[Dict[str, Any]]:
# the next future occurrence instead of firing a stale run.
grace = _compute_grace_seconds(schedule)
if kind in {"cron", "interval"} and (now - next_run_dt).total_seconds() > grace:
# Job is past its catch-up grace window — this is a stale missed run.
# Grace scales with schedule period: daily=2h, hourly=30m, 10min=5m.
# Job is past its catch-up grace window — skip accumulated
# missed runs but still execute once now to avoid deferring
# indefinitely (e.g. a long-running job just finished).
new_next = compute_next_run(schedule, now.isoformat())
if new_next:
logger.info(
"Job '%s' missed its scheduled time (%s, grace=%ds). "
"Fast-forwarding to next run: %s",
"Running now; next run provisionally set to: %s "
"(re-anchored on completion)",
job.get("name", job["id"]),
next_run,
grace,
new_next,
)
# Update the job in storage
# Persist the fast-forward to storage now (skip accumulated
# slots). In the built-in ticker path this is shortly
# overwritten by advance_next_run + mark_job_run, but it is
# NOT redundant: it (a) protects the crash window between
# here and mark_job_run, and (b) covers the external
# fire_due provider path, which does not call
# advance_next_run. mark_job_run re-anchors next_run_at off
# the actual completion time, so this value is provisional.
for rj in raw_jobs:
if rj["id"] == job["id"]:
rj["next_run_at"] = new_next
needs_save = True
break
continue # Skip this run
# Fall through to due.append(job) — execute once now
due.append(job)

View file

@ -685,10 +685,11 @@ class TestGetDueJobs:
assert len(due) == 1
assert due[0]["id"] == job["id"]
def test_stale_past_due_skipped(self, tmp_cron_dir):
"""Recurring jobs past their dynamic grace window are fast-forwarded, not fired.
def test_stale_past_due_runs_once_and_fast_forwards(self, tmp_cron_dir):
"""Recurring jobs past their grace window run once now and fast-forward next_run_at.
For an hourly job, grace = 30 min. Setting 35 min late exceeds the window.
The job should be returned as due (execute once) with next_run_at in the future.
"""
job = create_job(prompt="Stale", schedule="every 1h")
# Force next_run_at to 35 minutes ago (beyond the 30-min grace for hourly)
@ -697,13 +698,62 @@ class TestGetDueJobs:
save_jobs(jobs)
due = get_due_jobs()
assert len(due) == 0
# next_run_at should be fast-forwarded to the future
# Job is returned as due — execute once now instead of skipping
assert len(due) == 1
assert due[0]["id"] == job["id"]
# next_run_at should be fast-forwarded to the future (accumulated slots skipped)
updated = get_job(job["id"])
from cron.jobs import _ensure_aware, _hermes_now
next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"]))
assert next_dt > _hermes_now()
def test_long_execution_does_not_perpetually_defer(self, tmp_cron_dir, monkeypatch):
"""#33315: a recurring job whose runtime exceeds interval+grace must still
run once when the tick comes back, not skip forever.
Reproduces the production loop: a 5-min interval job whose previous run
overran the interval, leaving next_run_at ~11 min in the past beyond
the 150s grace for a 5m interval. The job must be returned as due (run
once) AND have next_run_at fast-forwarded (so accumulated missed slots
don't all fire)."""
from cron.jobs import _ensure_aware, _hermes_now
job = create_job(prompt="Long job", schedule="every 5m")
jobs = load_jobs()
# 11 minutes ago: > grace (150s for a 5m interval) — the "still running" miss.
stale = (_hermes_now() - timedelta(minutes=11)).isoformat()
jobs[0]["next_run_at"] = stale
jobs[0]["last_run_at"] = (_hermes_now() - timedelta(minutes=1)).isoformat()
save_jobs(jobs)
due = get_due_jobs()
assert [j["id"] for j in due] == [job["id"]], "long-execution job was skipped (perpetual-defer bug)"
# next_run_at fast-forwarded into the future (no burst of missed slots).
nxt = _ensure_aware(datetime.fromisoformat(get_job(job["id"])["next_run_at"]))
assert nxt > _hermes_now()
def test_stale_repeat_limited_job_consumes_one_run_on_catchup(self, tmp_cron_dir, monkeypatch):
"""#33315 behavior note: a stale recurring job with a repeat.times limit
fires ONCE on catch-up and consumes one of its runs (it is no longer
silently skipped). Pins the documented repeat-count interaction so it
isn't changed accidentally."""
from cron.jobs import _hermes_now
job = create_job(prompt="Limited", schedule="every 5m", repeat=3)
jobs = load_jobs()
jobs[0]["next_run_at"] = (_hermes_now() - timedelta(minutes=11)).isoformat()
jobs[0]["last_run_at"] = (_hermes_now() - timedelta(minutes=11)).isoformat()
save_jobs(jobs)
# The stale job is returned to fire once (not skipped).
due = get_due_jobs()
assert [j["id"] for j in due] == [job["id"]]
# Simulate the run completing: mark_job_run increments completed.
mark_job_run(job["id"], True)
survived = get_job(job["id"])
assert survived is not None, "job should survive (3 > 1 completed)"
assert survived["repeat"]["completed"] == 1
def test_future_not_returned(self, tmp_cron_dir):
create_job(prompt="Not yet", schedule="every 1h")
due = get_due_jobs()
@ -911,10 +961,15 @@ class TestGetDueJobs:
}]
)
# The wall-clock time has already passed, so this follows the existing
# stale-run fast-forward behavior instead of the timezone-migration
# repair path for future wall-clock runs.
assert get_due_jobs() == []
# The wall-clock time has already passed, so this does NOT take the
# timezone-migration repair path (which is for still-future wall-clock
# runs). It falls through to the stale-grace path, which — since #33315
# — runs the job once now and fast-forwards next_run_at (rather than
# skipping). The key assertion for THIS test is that the repaired
# next_run_at is the normal next cron occurrence, not the migration
# path's same-day rebase.
due = get_due_jobs()
assert [j["id"] for j in due] == ["cron-tz-missed"] # runs once now (#33315)
repaired = datetime.fromisoformat(get_job("cron-tz-missed")["next_run_at"])
assert repaired == datetime(2026, 5, 26, 9, 0, 0, tzinfo=current_tz)