From 07424da76f60ce1efee5239e9d324a3069873494 Mon Sep 17 00:00:00 2001 From: annguyenNous Date: Sun, 21 Jun 2026 12:47:48 +0530 Subject: [PATCH] fix(cron): keep ticker alive on BaseException + heartbeat-aware status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The in-process cron ticker (cron/scheduler_provider.py) caught only `Exception` and logged at DEBUG, so a `SystemExit`/`KeyboardInterrupt` raised from a misbehaving provider SDK or agent retry path killed the ticker thread silently. The gateway PROCESS stayed up, so `hermes cron status` — which only checks `find_gateway_pids()` — kept reporting "✓ jobs will fire automatically" while no jobs ever fired (#32612, #32895). This makes ticker death survivable and detectable: - The ticker loop now catches `BaseException` and logs at ERROR with a traceback, so a single bad tick no longer tears the thread down and the failure is visible in the gateway log. - The loop records a heartbeat (`cron/ticker_heartbeat`, epoch seconds) on startup and after every tick — best-effort, never raised into the loop. Both ticker entry points (the gateway and the desktop fallback in web_server.py) funnel through `InProcessCronScheduler.start`, so one heartbeat site covers both. - `hermes cron status` now reads the heartbeat age: if the gateway is running but the heartbeat is stale (> 200s, i.e. several missed ~60s ticks), it reports the ticker as STALLED and suggests a restart instead of falsely claiming jobs will fire. A missing heartbeat (older build / never ran) is treated as "unknown", not "dead". Adds tests for BaseException survival, per-iteration heartbeat recording, heartbeat round-trip/age, staleness detection, and silent-write-failure. Salvaged from #49660 (BaseException survival on current structure), extended with the heartbeat + honest-status reporting that the earlier (pre-refactor) watchdog PRs #35616 and #33849 proposed. Fixes #32612 Fixes #32895 Co-authored-by: banditburai Co-authored-by: sweetcornna <96944678+sweetcornna@users.noreply.github.com> --- cron/jobs.py | 87 ++++++++++++ cron/scheduler.py | 6 + cron/scheduler_provider.py | 21 ++- hermes_cli/cron.py | 44 +++++- tests/cron/test_scheduler_provider.py | 189 ++++++++++++++++++++++++++ 5 files changed, 343 insertions(+), 4 deletions(-) diff --git a/cron/jobs.py b/cron/jobs.py index 2f44608d649..22e3c595a18 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -12,6 +12,7 @@ import logging import shutil import tempfile import threading +import time import os import re import uuid @@ -51,6 +52,20 @@ except ImportError: HERMES_DIR = get_hermes_home().resolve() CRON_DIR = HERMES_DIR / "cron" JOBS_FILE = CRON_DIR / "jobs.json" +# Heartbeat file the in-process ticker touches on every loop iteration. The +# gateway process and the (separate) ``hermes cron status`` process share it +# so status can tell whether the ticker THREAD is alive, not just whether the +# gateway PROCESS exists — a ticker that dies silently inside a live gateway +# would otherwise report healthy (#32612, #32895). +TICKER_HEARTBEAT_FILE = CRON_DIR / "ticker_heartbeat" +# Last tick that completed WITHOUT raising. Distinguishing this from the plain +# heartbeat lets status detect a ticker that is alive but failing every tick. +TICKER_SUCCESS_FILE = CRON_DIR / "ticker_last_success" +# Default ticker loop interval (seconds). The single source of truth shared by +# the in-process ticker (cron/scheduler_provider.py) and the staleness +# threshold in `hermes cron status` (hermes_cli/cron.py), so the two never +# drift apart. +TICKER_INTERVAL_SECONDS = 60 # In-process lock protecting load_jobs→modify→save_jobs cycles. # Required when tick() runs jobs in parallel threads — without this, @@ -499,6 +514,78 @@ def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None return None +# ============================================================================= +# Ticker heartbeat (liveness signal for `hermes cron status`) +# ============================================================================= + +def _atomic_write_epoch(path: Path) -> None: + """Atomically write the current epoch time to ``path``. + + Uses the same tmpfile + ``atomic_replace`` pattern as ``save_jobs`` so a + concurrent reader in another process (``hermes cron status``) never sees a + torn/truncated file. Best-effort: failures are swallowed by callers. + """ + ensure_dirs() + fd, tmp_path = tempfile.mkstemp(dir=str(CRON_DIR), suffix=".tmp", prefix=".hb_") + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(str(time.time())) + f.flush() + os.fsync(f.fileno()) + atomic_replace(tmp_path, path) + except BaseException: + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + +def record_ticker_heartbeat(success: bool = False) -> None: + """Record a ticker liveness signal, and optionally a successful-tick signal. + + The ticker calls this once per loop iteration. ``success=True`` additionally + bumps the *last successful tick* marker. We track two distinct signals so + `hermes cron status` can tell a thread that is merely *alive and looping* + (heartbeat fresh, success stale) from one that is actually *firing jobs* + (both fresh) — a ticker stuck failing every tick would otherwise keep the + plain heartbeat fresh and falsely report healthy (#32612, #32895). + + Best-effort: a write failure must never disrupt the tick loop. + """ + try: + _atomic_write_epoch(TICKER_HEARTBEAT_FILE) + except Exception: + pass + if success: + try: + _atomic_write_epoch(TICKER_SUCCESS_FILE) + except Exception: + pass + + +def _epoch_file_age(path: Path) -> Optional[float]: + try: + raw = path.read_text(encoding="utf-8").strip() + return max(0.0, time.time() - float(raw)) + except Exception: + return None + + +def get_ticker_heartbeat_age() -> Optional[float]: + """Seconds since the ticker loop last iterated, or None if unknown. + + None = heartbeat file missing/unreadable (older build, never ran, or a + torn read). Callers treat None as "cannot determine", not "dead". + """ + return _epoch_file_age(TICKER_HEARTBEAT_FILE) + + +def get_ticker_success_age() -> Optional[float]: + """Seconds since the ticker last completed a tick WITHOUT raising, or None.""" + return _epoch_file_age(TICKER_SUCCESS_FILE) + + # ============================================================================= # Job CRUD Operations # ============================================================================= diff --git a/cron/scheduler.py b/cron/scheduler.py index 0956528b132..98a4d568cc9 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -2332,6 +2332,12 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i def _on_done(_f: concurrent.futures.Future) -> None: _remaining[0] -= 1 + try: + _exc = _f.exception() + if _exc is not None: + logger.error("Cron job future failed in async mode: %s", _exc, exc_info=(type(_exc), _exc, _exc.__traceback__)) + except Exception: + pass if _remaining[0] <= 0: _sweep_mcp_orphans() diff --git a/cron/scheduler_provider.py b/cron/scheduler_provider.py index 50bca6b892b..6b5c838617a 100644 --- a/cron/scheduler_provider.py +++ b/cron/scheduler_provider.py @@ -166,12 +166,29 @@ class InProcessCronScheduler(CronScheduler): def start(self, stop_event, *, adapters=None, loop=None, interval=60): import logging from cron.scheduler import tick as cron_tick + from cron.jobs import record_ticker_heartbeat logger = logging.getLogger("cron.scheduler_provider") logger.info("In-process cron scheduler started (interval=%ds)", interval) + # Heartbeat once before the first sleep so `hermes cron status` sees a + # live ticker immediately after startup, not only after the first tick. + record_ticker_heartbeat() while not stop_event.is_set(): + ok = False try: cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False) - except Exception as e: - logger.debug("Cron tick error: %s", e) + ok = True + except BaseException as e: + # Catch BaseException (not just Exception) so a SystemExit from + # a misbehaving provider SDK / agent retry path does not kill + # the ticker thread silently (#32612). KeyboardInterrupt is + # intentionally caught here too — gateway shutdown is driven by + # stop_event (set by the main thread's signal handler), not by + # an exception in this daemon thread, so swallowing it and + # re-checking stop_event keeps shutdown clean. + logger.error("Cron tick error: %s", e, exc_info=True) + # Record liveness every iteration; bump the success marker only on a + # clean tick, so status can tell "alive but failing every tick" from + # "actually firing jobs" (#32612, #32895). + record_ticker_heartbeat(success=ok) stop_event.wait(interval) diff --git a/hermes_cli/cron.py b/hermes_cli/cron.py index 86f8e6b09e2..f15181deed0 100644 --- a/hermes_cli/cron.py +++ b/hermes_cli/cron.py @@ -160,8 +160,48 @@ def cron_status(): pids = find_gateway_pids() if pids: - print(color("✓ Gateway is running — cron jobs will fire automatically", Colors.GREEN)) - print(f" PID: {', '.join(map(str, pids))}") + # The gateway PROCESS is alive — but the cron ticker THREAD inside it + # can die silently, or stay alive while every tick fails. Check both + # the liveness heartbeat and the last-successful-tick marker so we + # don't report "will fire" when the ticker is dead or failing + # (#32612, #32895). + from cron.jobs import ( + get_ticker_heartbeat_age, + get_ticker_success_age, + TICKER_INTERVAL_SECONDS, + ) + + # Allow ~3 missed ticker iterations (+ a little slack) before declaring + # trouble. Derived from the shared interval constant so this threshold + # tracks the ticker cadence instead of assuming a hardcoded 60s. + STALE_AFTER = TICKER_INTERVAL_SECONDS * 3 + 20 # = 200s at the 60s default + hb_age = get_ticker_heartbeat_age() + ok_age = get_ticker_success_age() + + if hb_age is not None and hb_age > STALE_AFTER: + # No heartbeat at all → the ticker thread is gone. + print(color( + "⚠ Gateway is running but the cron ticker looks STALLED — " + f"no heartbeat for {int(hb_age)}s (expected every ~60s).", + Colors.YELLOW, + )) + print(f" PID: {', '.join(map(str, pids))}") + print(" Cron jobs may NOT be firing. Restart: hermes gateway restart") + elif hb_age is not None and ok_age is not None and ok_age > STALE_AFTER: + # Loop is alive (fresh heartbeat) but no tick has SUCCEEDED in a + # long time → ticks are failing every iteration. + print(color( + "⚠ Gateway and cron ticker are running, but no tick has " + f"succeeded in {int(ok_age)}s — ticks may be failing.", + Colors.YELLOW, + )) + print(f" PID: {', '.join(map(str, pids))}") + print(" Check the gateway log for 'Cron tick error'.") + else: + print(color("✓ Gateway is running — cron jobs will fire automatically", Colors.GREEN)) + print(f" PID: {', '.join(map(str, pids))}") + if hb_age is not None: + print(f" Ticker heartbeat: {int(hb_age)}s ago") else: print(color("✗ Gateway is not running — cron jobs will NOT fire", Colors.RED)) print() diff --git a/tests/cron/test_scheduler_provider.py b/tests/cron/test_scheduler_provider.py index 2b2e159e2a3..d209af4ef5d 100644 --- a/tests/cron/test_scheduler_provider.py +++ b/tests/cron/test_scheduler_provider.py @@ -332,3 +332,192 @@ def test_fire_due_missing_job_does_not_run(monkeypatch): assert InProcessCronScheduler().fire_due("gone") is False assert ran == [] + + +# ── F2a: ticker liveness — survival, heartbeat, honest status (#32612, #32895) ── + + +def test_ticker_survives_baseexception_from_tick(): + """A BaseException (e.g. SystemExit from a provider SDK) raised by tick() + must NOT kill the ticker loop — it logs and keeps looping (#32612).""" + from cron.scheduler_provider import InProcessCronScheduler + + calls = [] + + def _boom(*a, **k): + calls.append(1) + if len(calls) == 1: + raise SystemExit("provider SDK called sys.exit") + return 0 + + stop = threading.Event() + prov = InProcessCronScheduler() + with patch("cron.scheduler.tick", side_effect=_boom), \ + patch("cron.jobs.record_ticker_heartbeat"): + t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True) + t.start() + time.sleep(0.2) + stop.set() + t.join(timeout=5) + + assert not t.is_alive(), "ticker thread died on BaseException instead of surviving" + assert len(calls) >= 2, "ticker did not keep ticking after the BaseException" + + +def test_ticker_records_heartbeat_each_iteration(): + """The loop records a liveness heartbeat on start and after each tick, + bumping the success marker only on a clean tick.""" + from cron.scheduler_provider import InProcessCronScheduler + + beats = [] # (success,) per call + stop = threading.Event() + prov = InProcessCronScheduler() + with patch("cron.scheduler.tick", side_effect=lambda *a, **k: 0), \ + patch("cron.jobs.record_ticker_heartbeat", + side_effect=lambda success=False: beats.append(success)): + t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True) + t.start() + time.sleep(0.2) + stop.set() + t.join(timeout=5) + + # one pre-loop liveness beat (success=False) + post-tick beats with success=True + assert len(beats) >= 2, "ticker did not record heartbeats" + assert beats[0] is False, "pre-loop beat should be liveness-only" + assert any(b is True for b in beats[1:]), "successful tick did not bump success marker" + + +def test_failing_tick_records_liveness_but_not_success(): + """A tick that raises bumps the liveness heartbeat but NOT the success + marker — so status can distinguish 'alive but failing' from 'firing'.""" + from cron.scheduler_provider import InProcessCronScheduler + + beats = [] + stop = threading.Event() + prov = InProcessCronScheduler() + with patch("cron.scheduler.tick", side_effect=RuntimeError("every tick fails")), \ + patch("cron.jobs.record_ticker_heartbeat", + side_effect=lambda success=False: beats.append(success)): + t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True) + t.start() + time.sleep(0.2) + stop.set() + t.join(timeout=5) + + # every post-tick beat must be success=False (ticks always failed) + assert len(beats) >= 2 + assert all(b is False for b in beats), "a failing tick wrongly bumped the success marker" + + +def test_heartbeat_roundtrip_and_age(tmp_path, monkeypatch): + """record_ticker_heartbeat writes fresh timestamps atomically; the age + getters read them back as small positive ages.""" + import cron.jobs as jobs + + cron_dir = tmp_path / "cron" + monkeypatch.setattr(jobs, "CRON_DIR", cron_dir) + monkeypatch.setattr(jobs, "OUTPUT_DIR", cron_dir / "output") + monkeypatch.setattr(jobs, "TICKER_HEARTBEAT_FILE", cron_dir / "ticker_heartbeat") + monkeypatch.setattr(jobs, "TICKER_SUCCESS_FILE", cron_dir / "ticker_last_success") + + # No files yet -> unknown (None), NOT "dead" + assert jobs.get_ticker_heartbeat_age() is None + assert jobs.get_ticker_success_age() is None + + # liveness-only: heartbeat set, success still unknown + jobs.record_ticker_heartbeat(success=False) + hb = jobs.get_ticker_heartbeat_age() + assert hb is not None and 0.0 <= hb < 5.0 + assert jobs.get_ticker_success_age() is None + + # success: both set + jobs.record_ticker_heartbeat(success=True) + ok = jobs.get_ticker_success_age() + assert ok is not None and 0.0 <= ok < 5.0 + + +def test_heartbeat_age_detects_staleness(tmp_path, monkeypatch): + """A heartbeat written far in the past reads back as a large age.""" + import cron.jobs as jobs + + cron_dir = tmp_path / "cron" + cron_dir.mkdir(parents=True) + hb = cron_dir / "ticker_heartbeat" + monkeypatch.setattr(jobs, "CRON_DIR", cron_dir) + monkeypatch.setattr(jobs, "TICKER_HEARTBEAT_FILE", hb) + + import time as _t + hb.write_text(str(_t.time() - 10_000), encoding="utf-8") + age = jobs.get_ticker_heartbeat_age() + assert age is not None and age > 9_000 + + +def test_heartbeat_write_failure_is_silent(tmp_path, monkeypatch): + """A real atomic-write failure must be swallowed AND leave no temp file. + + Point CRON_DIR at a path that cannot be created (its parent is a regular + file), so ensure_dirs()/mkstemp inside _atomic_write_epoch genuinely fail. + record_ticker_heartbeat must not raise, and no stray .hb_*.tmp may leak. + """ + import cron.jobs as jobs + + blocker = tmp_path / "not_a_dir" + blocker.write_text("i am a file, not a directory") + bad_cron_dir = blocker / "cron" # parent is a file -> mkdir/mkstemp fail + monkeypatch.setattr(jobs, "CRON_DIR", bad_cron_dir) + monkeypatch.setattr(jobs, "OUTPUT_DIR", bad_cron_dir / "output") + monkeypatch.setattr(jobs, "TICKER_HEARTBEAT_FILE", bad_cron_dir / "ticker_heartbeat") + monkeypatch.setattr(jobs, "TICKER_SUCCESS_FILE", bad_cron_dir / "ticker_last_success") + + jobs.record_ticker_heartbeat(success=True) # must not raise + + # The write never succeeded, so no heartbeat is recorded... + assert jobs.get_ticker_heartbeat_age() is None + # ...and no stray temp file leaked anywhere under tmp_path. + assert not list(tmp_path.rglob(".hb_*.tmp")), "atomic write leaked a temp file on failure" + + +def test_cron_status_reports_alive_but_failing(tmp_path, monkeypatch, capsys): + """cron_status warns when the ticker is alive (fresh heartbeat) but no tick + has succeeded recently (#32612: alive-but-failing must not look healthy).""" + import cron.jobs as jobs + from hermes_cli import cron as cron_cli + + monkeypatch.setattr("hermes_cli.gateway.find_gateway_pids", lambda: [4321]) + monkeypatch.setattr(jobs, "get_ticker_heartbeat_age", lambda: 5.0) # fresh + monkeypatch.setattr(jobs, "get_ticker_success_age", lambda: 9_999.0) # stale + monkeypatch.setattr("cron.jobs.list_jobs", lambda **k: []) + + cron_cli.cron_status() + out = capsys.readouterr().out + assert "no tick has succeeded" in out + assert "will fire automatically" not in out + + +def test_cron_status_healthy_when_both_fresh(tmp_path, monkeypatch, capsys): + import cron.jobs as jobs + from hermes_cli import cron as cron_cli + + monkeypatch.setattr("hermes_cli.gateway.find_gateway_pids", lambda: [4321]) + monkeypatch.setattr(jobs, "get_ticker_heartbeat_age", lambda: 5.0) + monkeypatch.setattr(jobs, "get_ticker_success_age", lambda: 5.0) + monkeypatch.setattr("cron.jobs.list_jobs", lambda **k: []) + + cron_cli.cron_status() + out = capsys.readouterr().out + assert "will fire automatically" in out + + +def test_cron_status_reports_stalled_when_no_heartbeat(tmp_path, monkeypatch, capsys): + import cron.jobs as jobs + from hermes_cli import cron as cron_cli + + monkeypatch.setattr("hermes_cli.gateway.find_gateway_pids", lambda: [4321]) + monkeypatch.setattr(jobs, "get_ticker_heartbeat_age", lambda: 9_999.0) # dead + monkeypatch.setattr(jobs, "get_ticker_success_age", lambda: 9_999.0) + monkeypatch.setattr("cron.jobs.list_jobs", lambda **k: []) + + cron_cli.cron_status() + out = capsys.readouterr().out + assert "STALLED" in out + assert "will fire automatically" not in out