diff --git a/cron/jobs.py b/cron/jobs.py index 0bf15e0a29d..ed0ac61fb21 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -1240,10 +1240,16 @@ def claim_job_for_fire(job_id: str, *, claim_ttl_seconds: int = 300) -> bool: def get_due_jobs() -> List[Dict[str, Any]]: """Get all jobs that are due to run now. - For recurring jobs (cron/interval), if the scheduled time is stale - (more than one period in the past, e.g. because the gateway was down), - the job is fast-forwarded to the next future run instead of firing - immediately. This prevents a burst of missed jobs on gateway restart. + For recurring jobs (cron/interval), if the scheduled time is stale (more + than one period in the past, e.g. because the gateway was down OR because a + long-running previous execution overran the interval), the accumulated + missed runs are collapsed — ``next_run_at`` is fast-forwarded to the next + future occurrence so a backlog does NOT burst-fire on restart — but the job + still fires ONCE now. This prevents the perpetual-defer loop (#33315) where + a job whose runtime exceeds ``interval + grace`` would be skipped forever. + + Note: firing once on catch-up flows through ``mark_job_run``, so a job with + a ``repeat.times`` limit consumes one of its runs on that catch-up fire. """ with _jobs_lock(): return _get_due_jobs_locked() @@ -1351,25 +1357,34 @@ def _get_due_jobs_locked() -> List[Dict[str, Any]]: # the next future occurrence instead of firing a stale run. grace = _compute_grace_seconds(schedule) if kind in {"cron", "interval"} and (now - next_run_dt).total_seconds() > grace: - # Job is past its catch-up grace window — this is a stale missed run. - # Grace scales with schedule period: daily=2h, hourly=30m, 10min=5m. + # Job is past its catch-up grace window — skip accumulated + # missed runs but still execute once now to avoid deferring + # indefinitely (e.g. a long-running job just finished). new_next = compute_next_run(schedule, now.isoformat()) if new_next: logger.info( "Job '%s' missed its scheduled time (%s, grace=%ds). " - "Fast-forwarding to next run: %s", + "Running now; next run provisionally set to: %s " + "(re-anchored on completion)", job.get("name", job["id"]), next_run, grace, new_next, ) - # Update the job in storage + # Persist the fast-forward to storage now (skip accumulated + # slots). In the built-in ticker path this is shortly + # overwritten by advance_next_run + mark_job_run, but it is + # NOT redundant: it (a) protects the crash window between + # here and mark_job_run, and (b) covers the external + # fire_due provider path, which does not call + # advance_next_run. mark_job_run re-anchors next_run_at off + # the actual completion time, so this value is provisional. for rj in raw_jobs: if rj["id"] == job["id"]: rj["next_run_at"] = new_next needs_save = True break - continue # Skip this run + # Fall through to due.append(job) — execute once now due.append(job) diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index f54041d0573..b554d19983b 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -685,10 +685,11 @@ class TestGetDueJobs: assert len(due) == 1 assert due[0]["id"] == job["id"] - def test_stale_past_due_skipped(self, tmp_cron_dir): - """Recurring jobs past their dynamic grace window are fast-forwarded, not fired. + def test_stale_past_due_runs_once_and_fast_forwards(self, tmp_cron_dir): + """Recurring jobs past their grace window run once now and fast-forward next_run_at. For an hourly job, grace = 30 min. Setting 35 min late exceeds the window. + The job should be returned as due (execute once) with next_run_at in the future. """ job = create_job(prompt="Stale", schedule="every 1h") # Force next_run_at to 35 minutes ago (beyond the 30-min grace for hourly) @@ -697,13 +698,62 @@ class TestGetDueJobs: save_jobs(jobs) due = get_due_jobs() - assert len(due) == 0 - # next_run_at should be fast-forwarded to the future + # Job is returned as due — execute once now instead of skipping + assert len(due) == 1 + assert due[0]["id"] == job["id"] + # next_run_at should be fast-forwarded to the future (accumulated slots skipped) updated = get_job(job["id"]) from cron.jobs import _ensure_aware, _hermes_now next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"])) assert next_dt > _hermes_now() + + def test_long_execution_does_not_perpetually_defer(self, tmp_cron_dir, monkeypatch): + """#33315: a recurring job whose runtime exceeds interval+grace must still + run once when the tick comes back, not skip forever. + + Reproduces the production loop: a 5-min interval job whose previous run + overran the interval, leaving next_run_at ~11 min in the past — beyond + the 150s grace for a 5m interval. The job must be returned as due (run + once) AND have next_run_at fast-forwarded (so accumulated missed slots + don't all fire).""" + from cron.jobs import _ensure_aware, _hermes_now + job = create_job(prompt="Long job", schedule="every 5m") + jobs = load_jobs() + # 11 minutes ago: > grace (150s for a 5m interval) — the "still running" miss. + stale = (_hermes_now() - timedelta(minutes=11)).isoformat() + jobs[0]["next_run_at"] = stale + jobs[0]["last_run_at"] = (_hermes_now() - timedelta(minutes=1)).isoformat() + save_jobs(jobs) + + due = get_due_jobs() + assert [j["id"] for j in due] == [job["id"]], "long-execution job was skipped (perpetual-defer bug)" + # next_run_at fast-forwarded into the future (no burst of missed slots). + nxt = _ensure_aware(datetime.fromisoformat(get_job(job["id"])["next_run_at"])) + assert nxt > _hermes_now() + + + def test_stale_repeat_limited_job_consumes_one_run_on_catchup(self, tmp_cron_dir, monkeypatch): + """#33315 behavior note: a stale recurring job with a repeat.times limit + fires ONCE on catch-up and consumes one of its runs (it is no longer + silently skipped). Pins the documented repeat-count interaction so it + isn't changed accidentally.""" + from cron.jobs import _hermes_now + job = create_job(prompt="Limited", schedule="every 5m", repeat=3) + jobs = load_jobs() + jobs[0]["next_run_at"] = (_hermes_now() - timedelta(minutes=11)).isoformat() + jobs[0]["last_run_at"] = (_hermes_now() - timedelta(minutes=11)).isoformat() + save_jobs(jobs) + + # The stale job is returned to fire once (not skipped). + due = get_due_jobs() + assert [j["id"] for j in due] == [job["id"]] + # Simulate the run completing: mark_job_run increments completed. + mark_job_run(job["id"], True) + survived = get_job(job["id"]) + assert survived is not None, "job should survive (3 > 1 completed)" + assert survived["repeat"]["completed"] == 1 + def test_future_not_returned(self, tmp_cron_dir): create_job(prompt="Not yet", schedule="every 1h") due = get_due_jobs() @@ -911,10 +961,15 @@ class TestGetDueJobs: }] ) - # The wall-clock time has already passed, so this follows the existing - # stale-run fast-forward behavior instead of the timezone-migration - # repair path for future wall-clock runs. - assert get_due_jobs() == [] + # The wall-clock time has already passed, so this does NOT take the + # timezone-migration repair path (which is for still-future wall-clock + # runs). It falls through to the stale-grace path, which — since #33315 + # — runs the job once now and fast-forwards next_run_at (rather than + # skipping). The key assertion for THIS test is that the repaired + # next_run_at is the normal next cron occurrence, not the migration + # path's same-day rebase. + due = get_due_jobs() + assert [j["id"] for j in due] == ["cron-tz-missed"] # runs once now (#33315) repaired = datetime.fromisoformat(get_job("cron-tz-missed")["next_run_at"]) assert repaired == datetime(2026, 5, 26, 9, 0, 0, tzinfo=current_tz)