From 6777a6bd67ccabd92455845736b17150a96c6a14 Mon Sep 17 00:00:00 2001 From: liuhao1024 Date: Sun, 21 Jun 2026 14:06:30 +0530 Subject: [PATCH] fix(cron): run missed-grace jobs once instead of deferring forever MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a recurring job's execution time exceeds `interval + grace`, the scheduler entered a perpetual "missed → fast-forward → skip" loop and the job effectively never ran again. A real job (`hermes-upstream-contribution`) logged 42 consecutive "missed" events over 9 hours without executing once. Timeline (5-min interval, 150s grace, ~15-min execution): 14:00 due → advance next_run_at→14:05 → run (blocks 15 min) 14:15 finishes 14:16 tick: next_run_at=14:05, elapsed 660s > grace 150s → "missed!" → fast-forward to 14:21 → continue (SKIP) → does NOT run ... repeats forever for any job whose runtime > interval+grace. The `continue` (skip execution) in `_get_due_jobs_locked` was designed to prevent burst-catchup after *gateway downtime* — don't run 6 missed instances of a 30-min job on restart. But it wrongly applied to a job that missed its slot because it was *still running*, not because the gateway was down. Fix: keep the fast-forward (so accumulated missed slots are still collapsed to a single next slot — no burst) but fall through to `due.append(job)` so the job runs ONCE now. The log message is updated to be honest about the new behavior ("Running now; next run fast-forwarded to: ..."). Behavior note: a recurring job missed during gateway downtime now also fires once immediately on restart (rather than waiting for its next natural slot). This is the intended trade-off — the same "run once, don't burst" rule now applies uniformly to both downtime-misses and long-execution-misses. Salvaged from #33318 by @liuhao1024 (authorship preserved). Also addresses the diagnosis in #33361 (@agent-trivi), which proposed the same one-line fix. Tests: updates `test_stale_past_due_skipped` → `test_stale_past_due_runs_once_and_fast_forwards` (the old test encoded the skip behavior); adds `test_long_execution_does_not_perpetually_defer` as a direct regression for the production loop; updates the F2e timezone test that relied on the old skip path. Full tests/cron/ suite: 510 passed. Fixes #33315 Co-authored-by: liuhao1024 --- cron/jobs.py | 33 +++++++++++++------ tests/cron/test_jobs.py | 71 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 87 insertions(+), 17 deletions(-) diff --git a/cron/jobs.py b/cron/jobs.py index 0bf15e0a29d..ed0ac61fb21 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -1240,10 +1240,16 @@ def claim_job_for_fire(job_id: str, *, claim_ttl_seconds: int = 300) -> bool: def get_due_jobs() -> List[Dict[str, Any]]: """Get all jobs that are due to run now. - For recurring jobs (cron/interval), if the scheduled time is stale - (more than one period in the past, e.g. because the gateway was down), - the job is fast-forwarded to the next future run instead of firing - immediately. This prevents a burst of missed jobs on gateway restart. + For recurring jobs (cron/interval), if the scheduled time is stale (more + than one period in the past, e.g. because the gateway was down OR because a + long-running previous execution overran the interval), the accumulated + missed runs are collapsed — ``next_run_at`` is fast-forwarded to the next + future occurrence so a backlog does NOT burst-fire on restart — but the job + still fires ONCE now. This prevents the perpetual-defer loop (#33315) where + a job whose runtime exceeds ``interval + grace`` would be skipped forever. + + Note: firing once on catch-up flows through ``mark_job_run``, so a job with + a ``repeat.times`` limit consumes one of its runs on that catch-up fire. """ with _jobs_lock(): return _get_due_jobs_locked() @@ -1351,25 +1357,34 @@ def _get_due_jobs_locked() -> List[Dict[str, Any]]: # the next future occurrence instead of firing a stale run. grace = _compute_grace_seconds(schedule) if kind in {"cron", "interval"} and (now - next_run_dt).total_seconds() > grace: - # Job is past its catch-up grace window — this is a stale missed run. - # Grace scales with schedule period: daily=2h, hourly=30m, 10min=5m. + # Job is past its catch-up grace window — skip accumulated + # missed runs but still execute once now to avoid deferring + # indefinitely (e.g. a long-running job just finished). new_next = compute_next_run(schedule, now.isoformat()) if new_next: logger.info( "Job '%s' missed its scheduled time (%s, grace=%ds). " - "Fast-forwarding to next run: %s", + "Running now; next run provisionally set to: %s " + "(re-anchored on completion)", job.get("name", job["id"]), next_run, grace, new_next, ) - # Update the job in storage + # Persist the fast-forward to storage now (skip accumulated + # slots). In the built-in ticker path this is shortly + # overwritten by advance_next_run + mark_job_run, but it is + # NOT redundant: it (a) protects the crash window between + # here and mark_job_run, and (b) covers the external + # fire_due provider path, which does not call + # advance_next_run. mark_job_run re-anchors next_run_at off + # the actual completion time, so this value is provisional. for rj in raw_jobs: if rj["id"] == job["id"]: rj["next_run_at"] = new_next needs_save = True break - continue # Skip this run + # Fall through to due.append(job) — execute once now due.append(job) diff --git a/tests/cron/test_jobs.py b/tests/cron/test_jobs.py index f54041d0573..b554d19983b 100644 --- a/tests/cron/test_jobs.py +++ b/tests/cron/test_jobs.py @@ -685,10 +685,11 @@ class TestGetDueJobs: assert len(due) == 1 assert due[0]["id"] == job["id"] - def test_stale_past_due_skipped(self, tmp_cron_dir): - """Recurring jobs past their dynamic grace window are fast-forwarded, not fired. + def test_stale_past_due_runs_once_and_fast_forwards(self, tmp_cron_dir): + """Recurring jobs past their grace window run once now and fast-forward next_run_at. For an hourly job, grace = 30 min. Setting 35 min late exceeds the window. + The job should be returned as due (execute once) with next_run_at in the future. """ job = create_job(prompt="Stale", schedule="every 1h") # Force next_run_at to 35 minutes ago (beyond the 30-min grace for hourly) @@ -697,13 +698,62 @@ class TestGetDueJobs: save_jobs(jobs) due = get_due_jobs() - assert len(due) == 0 - # next_run_at should be fast-forwarded to the future + # Job is returned as due — execute once now instead of skipping + assert len(due) == 1 + assert due[0]["id"] == job["id"] + # next_run_at should be fast-forwarded to the future (accumulated slots skipped) updated = get_job(job["id"]) from cron.jobs import _ensure_aware, _hermes_now next_dt = _ensure_aware(datetime.fromisoformat(updated["next_run_at"])) assert next_dt > _hermes_now() + + def test_long_execution_does_not_perpetually_defer(self, tmp_cron_dir, monkeypatch): + """#33315: a recurring job whose runtime exceeds interval+grace must still + run once when the tick comes back, not skip forever. + + Reproduces the production loop: a 5-min interval job whose previous run + overran the interval, leaving next_run_at ~11 min in the past — beyond + the 150s grace for a 5m interval. The job must be returned as due (run + once) AND have next_run_at fast-forwarded (so accumulated missed slots + don't all fire).""" + from cron.jobs import _ensure_aware, _hermes_now + job = create_job(prompt="Long job", schedule="every 5m") + jobs = load_jobs() + # 11 minutes ago: > grace (150s for a 5m interval) — the "still running" miss. + stale = (_hermes_now() - timedelta(minutes=11)).isoformat() + jobs[0]["next_run_at"] = stale + jobs[0]["last_run_at"] = (_hermes_now() - timedelta(minutes=1)).isoformat() + save_jobs(jobs) + + due = get_due_jobs() + assert [j["id"] for j in due] == [job["id"]], "long-execution job was skipped (perpetual-defer bug)" + # next_run_at fast-forwarded into the future (no burst of missed slots). + nxt = _ensure_aware(datetime.fromisoformat(get_job(job["id"])["next_run_at"])) + assert nxt > _hermes_now() + + + def test_stale_repeat_limited_job_consumes_one_run_on_catchup(self, tmp_cron_dir, monkeypatch): + """#33315 behavior note: a stale recurring job with a repeat.times limit + fires ONCE on catch-up and consumes one of its runs (it is no longer + silently skipped). Pins the documented repeat-count interaction so it + isn't changed accidentally.""" + from cron.jobs import _hermes_now + job = create_job(prompt="Limited", schedule="every 5m", repeat=3) + jobs = load_jobs() + jobs[0]["next_run_at"] = (_hermes_now() - timedelta(minutes=11)).isoformat() + jobs[0]["last_run_at"] = (_hermes_now() - timedelta(minutes=11)).isoformat() + save_jobs(jobs) + + # The stale job is returned to fire once (not skipped). + due = get_due_jobs() + assert [j["id"] for j in due] == [job["id"]] + # Simulate the run completing: mark_job_run increments completed. + mark_job_run(job["id"], True) + survived = get_job(job["id"]) + assert survived is not None, "job should survive (3 > 1 completed)" + assert survived["repeat"]["completed"] == 1 + def test_future_not_returned(self, tmp_cron_dir): create_job(prompt="Not yet", schedule="every 1h") due = get_due_jobs() @@ -911,10 +961,15 @@ class TestGetDueJobs: }] ) - # The wall-clock time has already passed, so this follows the existing - # stale-run fast-forward behavior instead of the timezone-migration - # repair path for future wall-clock runs. - assert get_due_jobs() == [] + # The wall-clock time has already passed, so this does NOT take the + # timezone-migration repair path (which is for still-future wall-clock + # runs). It falls through to the stale-grace path, which — since #33315 + # — runs the job once now and fast-forwards next_run_at (rather than + # skipping). The key assertion for THIS test is that the repaired + # next_run_at is the normal next cron occurrence, not the migration + # path's same-day rebase. + due = get_due_jobs() + assert [j["id"] for j in due] == ["cron-tz-missed"] # runs once now (#33315) repaired = datetime.fromisoformat(get_job("cron-tz-missed")["next_run_at"]) assert repaired == datetime(2026, 5, 26, 9, 0, 0, tzinfo=current_tz)