mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-23 10:42:00 +00:00
fix(cron): keep ticker alive on BaseException + heartbeat-aware status
The in-process cron ticker (cron/scheduler_provider.py) caught only `Exception` and logged at DEBUG, so a `SystemExit`/`KeyboardInterrupt` raised from a misbehaving provider SDK or agent retry path killed the ticker thread silently. The gateway PROCESS stayed up, so `hermes cron status` — which only checks `find_gateway_pids()` — kept reporting "✓ jobs will fire automatically" while no jobs ever fired (#32612, #32895). This makes ticker death survivable and detectable: - The ticker loop now catches `BaseException` and logs at ERROR with a traceback, so a single bad tick no longer tears the thread down and the failure is visible in the gateway log. - The loop records a heartbeat (`cron/ticker_heartbeat`, epoch seconds) on startup and after every tick — best-effort, never raised into the loop. Both ticker entry points (the gateway and the desktop fallback in web_server.py) funnel through `InProcessCronScheduler.start`, so one heartbeat site covers both. - `hermes cron status` now reads the heartbeat age: if the gateway is running but the heartbeat is stale (> 200s, i.e. several missed ~60s ticks), it reports the ticker as STALLED and suggests a restart instead of falsely claiming jobs will fire. A missing heartbeat (older build / never ran) is treated as "unknown", not "dead". Adds tests for BaseException survival, per-iteration heartbeat recording, heartbeat round-trip/age, staleness detection, and silent-write-failure. Salvaged from #49660 (BaseException survival on current structure), extended with the heartbeat + honest-status reporting that the earlier (pre-refactor) watchdog PRs #35616 and #33849 proposed. Fixes #32612 Fixes #32895 Co-authored-by: banditburai <promptsiren@gmail.com> Co-authored-by: sweetcornna <96944678+sweetcornna@users.noreply.github.com>
This commit is contained in:
parent
35752fc3a5
commit
07424da76f
5 changed files with 343 additions and 4 deletions
87
cron/jobs.py
87
cron/jobs.py
|
|
@ -12,6 +12,7 @@ import logging
|
|||
import shutil
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
import uuid
|
||||
|
|
@ -51,6 +52,20 @@ except ImportError:
|
|||
HERMES_DIR = get_hermes_home().resolve()
|
||||
CRON_DIR = HERMES_DIR / "cron"
|
||||
JOBS_FILE = CRON_DIR / "jobs.json"
|
||||
# Heartbeat file the in-process ticker touches on every loop iteration. The
|
||||
# gateway process and the (separate) ``hermes cron status`` process share it
|
||||
# so status can tell whether the ticker THREAD is alive, not just whether the
|
||||
# gateway PROCESS exists — a ticker that dies silently inside a live gateway
|
||||
# would otherwise report healthy (#32612, #32895).
|
||||
TICKER_HEARTBEAT_FILE = CRON_DIR / "ticker_heartbeat"
|
||||
# Last tick that completed WITHOUT raising. Distinguishing this from the plain
|
||||
# heartbeat lets status detect a ticker that is alive but failing every tick.
|
||||
TICKER_SUCCESS_FILE = CRON_DIR / "ticker_last_success"
|
||||
# Default ticker loop interval (seconds). The single source of truth shared by
|
||||
# the in-process ticker (cron/scheduler_provider.py) and the staleness
|
||||
# threshold in `hermes cron status` (hermes_cli/cron.py), so the two never
|
||||
# drift apart.
|
||||
TICKER_INTERVAL_SECONDS = 60
|
||||
|
||||
# In-process lock protecting load_jobs→modify→save_jobs cycles.
|
||||
# Required when tick() runs jobs in parallel threads — without this,
|
||||
|
|
@ -499,6 +514,78 @@ def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None
|
|||
return None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Ticker heartbeat (liveness signal for `hermes cron status`)
|
||||
# =============================================================================
|
||||
|
||||
def _atomic_write_epoch(path: Path) -> None:
|
||||
"""Atomically write the current epoch time to ``path``.
|
||||
|
||||
Uses the same tmpfile + ``atomic_replace`` pattern as ``save_jobs`` so a
|
||||
concurrent reader in another process (``hermes cron status``) never sees a
|
||||
torn/truncated file. Best-effort: failures are swallowed by callers.
|
||||
"""
|
||||
ensure_dirs()
|
||||
fd, tmp_path = tempfile.mkstemp(dir=str(CRON_DIR), suffix=".tmp", prefix=".hb_")
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||
f.write(str(time.time()))
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
atomic_replace(tmp_path, path)
|
||||
except BaseException:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
def record_ticker_heartbeat(success: bool = False) -> None:
|
||||
"""Record a ticker liveness signal, and optionally a successful-tick signal.
|
||||
|
||||
The ticker calls this once per loop iteration. ``success=True`` additionally
|
||||
bumps the *last successful tick* marker. We track two distinct signals so
|
||||
`hermes cron status` can tell a thread that is merely *alive and looping*
|
||||
(heartbeat fresh, success stale) from one that is actually *firing jobs*
|
||||
(both fresh) — a ticker stuck failing every tick would otherwise keep the
|
||||
plain heartbeat fresh and falsely report healthy (#32612, #32895).
|
||||
|
||||
Best-effort: a write failure must never disrupt the tick loop.
|
||||
"""
|
||||
try:
|
||||
_atomic_write_epoch(TICKER_HEARTBEAT_FILE)
|
||||
except Exception:
|
||||
pass
|
||||
if success:
|
||||
try:
|
||||
_atomic_write_epoch(TICKER_SUCCESS_FILE)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _epoch_file_age(path: Path) -> Optional[float]:
|
||||
try:
|
||||
raw = path.read_text(encoding="utf-8").strip()
|
||||
return max(0.0, time.time() - float(raw))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def get_ticker_heartbeat_age() -> Optional[float]:
|
||||
"""Seconds since the ticker loop last iterated, or None if unknown.
|
||||
|
||||
None = heartbeat file missing/unreadable (older build, never ran, or a
|
||||
torn read). Callers treat None as "cannot determine", not "dead".
|
||||
"""
|
||||
return _epoch_file_age(TICKER_HEARTBEAT_FILE)
|
||||
|
||||
|
||||
def get_ticker_success_age() -> Optional[float]:
|
||||
"""Seconds since the ticker last completed a tick WITHOUT raising, or None."""
|
||||
return _epoch_file_age(TICKER_SUCCESS_FILE)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Job CRUD Operations
|
||||
# =============================================================================
|
||||
|
|
|
|||
|
|
@ -2332,6 +2332,12 @@ def tick(verbose: bool = True, adapters=None, loop=None, sync: bool = True) -> i
|
|||
|
||||
def _on_done(_f: concurrent.futures.Future) -> None:
|
||||
_remaining[0] -= 1
|
||||
try:
|
||||
_exc = _f.exception()
|
||||
if _exc is not None:
|
||||
logger.error("Cron job future failed in async mode: %s", _exc, exc_info=(type(_exc), _exc, _exc.__traceback__))
|
||||
except Exception:
|
||||
pass
|
||||
if _remaining[0] <= 0:
|
||||
_sweep_mcp_orphans()
|
||||
|
||||
|
|
|
|||
|
|
@ -166,12 +166,29 @@ class InProcessCronScheduler(CronScheduler):
|
|||
def start(self, stop_event, *, adapters=None, loop=None, interval=60):
|
||||
import logging
|
||||
from cron.scheduler import tick as cron_tick
|
||||
from cron.jobs import record_ticker_heartbeat
|
||||
|
||||
logger = logging.getLogger("cron.scheduler_provider")
|
||||
logger.info("In-process cron scheduler started (interval=%ds)", interval)
|
||||
# Heartbeat once before the first sleep so `hermes cron status` sees a
|
||||
# live ticker immediately after startup, not only after the first tick.
|
||||
record_ticker_heartbeat()
|
||||
while not stop_event.is_set():
|
||||
ok = False
|
||||
try:
|
||||
cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False)
|
||||
except Exception as e:
|
||||
logger.debug("Cron tick error: %s", e)
|
||||
ok = True
|
||||
except BaseException as e:
|
||||
# Catch BaseException (not just Exception) so a SystemExit from
|
||||
# a misbehaving provider SDK / agent retry path does not kill
|
||||
# the ticker thread silently (#32612). KeyboardInterrupt is
|
||||
# intentionally caught here too — gateway shutdown is driven by
|
||||
# stop_event (set by the main thread's signal handler), not by
|
||||
# an exception in this daemon thread, so swallowing it and
|
||||
# re-checking stop_event keeps shutdown clean.
|
||||
logger.error("Cron tick error: %s", e, exc_info=True)
|
||||
# Record liveness every iteration; bump the success marker only on a
|
||||
# clean tick, so status can tell "alive but failing every tick" from
|
||||
# "actually firing jobs" (#32612, #32895).
|
||||
record_ticker_heartbeat(success=ok)
|
||||
stop_event.wait(interval)
|
||||
|
|
|
|||
|
|
@ -160,8 +160,48 @@ def cron_status():
|
|||
|
||||
pids = find_gateway_pids()
|
||||
if pids:
|
||||
print(color("✓ Gateway is running — cron jobs will fire automatically", Colors.GREEN))
|
||||
print(f" PID: {', '.join(map(str, pids))}")
|
||||
# The gateway PROCESS is alive — but the cron ticker THREAD inside it
|
||||
# can die silently, or stay alive while every tick fails. Check both
|
||||
# the liveness heartbeat and the last-successful-tick marker so we
|
||||
# don't report "will fire" when the ticker is dead or failing
|
||||
# (#32612, #32895).
|
||||
from cron.jobs import (
|
||||
get_ticker_heartbeat_age,
|
||||
get_ticker_success_age,
|
||||
TICKER_INTERVAL_SECONDS,
|
||||
)
|
||||
|
||||
# Allow ~3 missed ticker iterations (+ a little slack) before declaring
|
||||
# trouble. Derived from the shared interval constant so this threshold
|
||||
# tracks the ticker cadence instead of assuming a hardcoded 60s.
|
||||
STALE_AFTER = TICKER_INTERVAL_SECONDS * 3 + 20 # = 200s at the 60s default
|
||||
hb_age = get_ticker_heartbeat_age()
|
||||
ok_age = get_ticker_success_age()
|
||||
|
||||
if hb_age is not None and hb_age > STALE_AFTER:
|
||||
# No heartbeat at all → the ticker thread is gone.
|
||||
print(color(
|
||||
"⚠ Gateway is running but the cron ticker looks STALLED — "
|
||||
f"no heartbeat for {int(hb_age)}s (expected every ~60s).",
|
||||
Colors.YELLOW,
|
||||
))
|
||||
print(f" PID: {', '.join(map(str, pids))}")
|
||||
print(" Cron jobs may NOT be firing. Restart: hermes gateway restart")
|
||||
elif hb_age is not None and ok_age is not None and ok_age > STALE_AFTER:
|
||||
# Loop is alive (fresh heartbeat) but no tick has SUCCEEDED in a
|
||||
# long time → ticks are failing every iteration.
|
||||
print(color(
|
||||
"⚠ Gateway and cron ticker are running, but no tick has "
|
||||
f"succeeded in {int(ok_age)}s — ticks may be failing.",
|
||||
Colors.YELLOW,
|
||||
))
|
||||
print(f" PID: {', '.join(map(str, pids))}")
|
||||
print(" Check the gateway log for 'Cron tick error'.")
|
||||
else:
|
||||
print(color("✓ Gateway is running — cron jobs will fire automatically", Colors.GREEN))
|
||||
print(f" PID: {', '.join(map(str, pids))}")
|
||||
if hb_age is not None:
|
||||
print(f" Ticker heartbeat: {int(hb_age)}s ago")
|
||||
else:
|
||||
print(color("✗ Gateway is not running — cron jobs will NOT fire", Colors.RED))
|
||||
print()
|
||||
|
|
|
|||
|
|
@ -332,3 +332,192 @@ def test_fire_due_missing_job_does_not_run(monkeypatch):
|
|||
|
||||
assert InProcessCronScheduler().fire_due("gone") is False
|
||||
assert ran == []
|
||||
|
||||
|
||||
# ── F2a: ticker liveness — survival, heartbeat, honest status (#32612, #32895) ──
|
||||
|
||||
|
||||
def test_ticker_survives_baseexception_from_tick():
|
||||
"""A BaseException (e.g. SystemExit from a provider SDK) raised by tick()
|
||||
must NOT kill the ticker loop — it logs and keeps looping (#32612)."""
|
||||
from cron.scheduler_provider import InProcessCronScheduler
|
||||
|
||||
calls = []
|
||||
|
||||
def _boom(*a, **k):
|
||||
calls.append(1)
|
||||
if len(calls) == 1:
|
||||
raise SystemExit("provider SDK called sys.exit")
|
||||
return 0
|
||||
|
||||
stop = threading.Event()
|
||||
prov = InProcessCronScheduler()
|
||||
with patch("cron.scheduler.tick", side_effect=_boom), \
|
||||
patch("cron.jobs.record_ticker_heartbeat"):
|
||||
t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.2)
|
||||
stop.set()
|
||||
t.join(timeout=5)
|
||||
|
||||
assert not t.is_alive(), "ticker thread died on BaseException instead of surviving"
|
||||
assert len(calls) >= 2, "ticker did not keep ticking after the BaseException"
|
||||
|
||||
|
||||
def test_ticker_records_heartbeat_each_iteration():
|
||||
"""The loop records a liveness heartbeat on start and after each tick,
|
||||
bumping the success marker only on a clean tick."""
|
||||
from cron.scheduler_provider import InProcessCronScheduler
|
||||
|
||||
beats = [] # (success,) per call
|
||||
stop = threading.Event()
|
||||
prov = InProcessCronScheduler()
|
||||
with patch("cron.scheduler.tick", side_effect=lambda *a, **k: 0), \
|
||||
patch("cron.jobs.record_ticker_heartbeat",
|
||||
side_effect=lambda success=False: beats.append(success)):
|
||||
t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.2)
|
||||
stop.set()
|
||||
t.join(timeout=5)
|
||||
|
||||
# one pre-loop liveness beat (success=False) + post-tick beats with success=True
|
||||
assert len(beats) >= 2, "ticker did not record heartbeats"
|
||||
assert beats[0] is False, "pre-loop beat should be liveness-only"
|
||||
assert any(b is True for b in beats[1:]), "successful tick did not bump success marker"
|
||||
|
||||
|
||||
def test_failing_tick_records_liveness_but_not_success():
|
||||
"""A tick that raises bumps the liveness heartbeat but NOT the success
|
||||
marker — so status can distinguish 'alive but failing' from 'firing'."""
|
||||
from cron.scheduler_provider import InProcessCronScheduler
|
||||
|
||||
beats = []
|
||||
stop = threading.Event()
|
||||
prov = InProcessCronScheduler()
|
||||
with patch("cron.scheduler.tick", side_effect=RuntimeError("every tick fails")), \
|
||||
patch("cron.jobs.record_ticker_heartbeat",
|
||||
side_effect=lambda success=False: beats.append(success)):
|
||||
t = threading.Thread(target=prov.start, args=(stop,), kwargs={"interval": 0}, daemon=True)
|
||||
t.start()
|
||||
time.sleep(0.2)
|
||||
stop.set()
|
||||
t.join(timeout=5)
|
||||
|
||||
# every post-tick beat must be success=False (ticks always failed)
|
||||
assert len(beats) >= 2
|
||||
assert all(b is False for b in beats), "a failing tick wrongly bumped the success marker"
|
||||
|
||||
|
||||
def test_heartbeat_roundtrip_and_age(tmp_path, monkeypatch):
|
||||
"""record_ticker_heartbeat writes fresh timestamps atomically; the age
|
||||
getters read them back as small positive ages."""
|
||||
import cron.jobs as jobs
|
||||
|
||||
cron_dir = tmp_path / "cron"
|
||||
monkeypatch.setattr(jobs, "CRON_DIR", cron_dir)
|
||||
monkeypatch.setattr(jobs, "OUTPUT_DIR", cron_dir / "output")
|
||||
monkeypatch.setattr(jobs, "TICKER_HEARTBEAT_FILE", cron_dir / "ticker_heartbeat")
|
||||
monkeypatch.setattr(jobs, "TICKER_SUCCESS_FILE", cron_dir / "ticker_last_success")
|
||||
|
||||
# No files yet -> unknown (None), NOT "dead"
|
||||
assert jobs.get_ticker_heartbeat_age() is None
|
||||
assert jobs.get_ticker_success_age() is None
|
||||
|
||||
# liveness-only: heartbeat set, success still unknown
|
||||
jobs.record_ticker_heartbeat(success=False)
|
||||
hb = jobs.get_ticker_heartbeat_age()
|
||||
assert hb is not None and 0.0 <= hb < 5.0
|
||||
assert jobs.get_ticker_success_age() is None
|
||||
|
||||
# success: both set
|
||||
jobs.record_ticker_heartbeat(success=True)
|
||||
ok = jobs.get_ticker_success_age()
|
||||
assert ok is not None and 0.0 <= ok < 5.0
|
||||
|
||||
|
||||
def test_heartbeat_age_detects_staleness(tmp_path, monkeypatch):
|
||||
"""A heartbeat written far in the past reads back as a large age."""
|
||||
import cron.jobs as jobs
|
||||
|
||||
cron_dir = tmp_path / "cron"
|
||||
cron_dir.mkdir(parents=True)
|
||||
hb = cron_dir / "ticker_heartbeat"
|
||||
monkeypatch.setattr(jobs, "CRON_DIR", cron_dir)
|
||||
monkeypatch.setattr(jobs, "TICKER_HEARTBEAT_FILE", hb)
|
||||
|
||||
import time as _t
|
||||
hb.write_text(str(_t.time() - 10_000), encoding="utf-8")
|
||||
age = jobs.get_ticker_heartbeat_age()
|
||||
assert age is not None and age > 9_000
|
||||
|
||||
|
||||
def test_heartbeat_write_failure_is_silent(tmp_path, monkeypatch):
|
||||
"""A real atomic-write failure must be swallowed AND leave no temp file.
|
||||
|
||||
Point CRON_DIR at a path that cannot be created (its parent is a regular
|
||||
file), so ensure_dirs()/mkstemp inside _atomic_write_epoch genuinely fail.
|
||||
record_ticker_heartbeat must not raise, and no stray .hb_*.tmp may leak.
|
||||
"""
|
||||
import cron.jobs as jobs
|
||||
|
||||
blocker = tmp_path / "not_a_dir"
|
||||
blocker.write_text("i am a file, not a directory")
|
||||
bad_cron_dir = blocker / "cron" # parent is a file -> mkdir/mkstemp fail
|
||||
monkeypatch.setattr(jobs, "CRON_DIR", bad_cron_dir)
|
||||
monkeypatch.setattr(jobs, "OUTPUT_DIR", bad_cron_dir / "output")
|
||||
monkeypatch.setattr(jobs, "TICKER_HEARTBEAT_FILE", bad_cron_dir / "ticker_heartbeat")
|
||||
monkeypatch.setattr(jobs, "TICKER_SUCCESS_FILE", bad_cron_dir / "ticker_last_success")
|
||||
|
||||
jobs.record_ticker_heartbeat(success=True) # must not raise
|
||||
|
||||
# The write never succeeded, so no heartbeat is recorded...
|
||||
assert jobs.get_ticker_heartbeat_age() is None
|
||||
# ...and no stray temp file leaked anywhere under tmp_path.
|
||||
assert not list(tmp_path.rglob(".hb_*.tmp")), "atomic write leaked a temp file on failure"
|
||||
|
||||
|
||||
def test_cron_status_reports_alive_but_failing(tmp_path, monkeypatch, capsys):
|
||||
"""cron_status warns when the ticker is alive (fresh heartbeat) but no tick
|
||||
has succeeded recently (#32612: alive-but-failing must not look healthy)."""
|
||||
import cron.jobs as jobs
|
||||
from hermes_cli import cron as cron_cli
|
||||
|
||||
monkeypatch.setattr("hermes_cli.gateway.find_gateway_pids", lambda: [4321])
|
||||
monkeypatch.setattr(jobs, "get_ticker_heartbeat_age", lambda: 5.0) # fresh
|
||||
monkeypatch.setattr(jobs, "get_ticker_success_age", lambda: 9_999.0) # stale
|
||||
monkeypatch.setattr("cron.jobs.list_jobs", lambda **k: [])
|
||||
|
||||
cron_cli.cron_status()
|
||||
out = capsys.readouterr().out
|
||||
assert "no tick has succeeded" in out
|
||||
assert "will fire automatically" not in out
|
||||
|
||||
|
||||
def test_cron_status_healthy_when_both_fresh(tmp_path, monkeypatch, capsys):
|
||||
import cron.jobs as jobs
|
||||
from hermes_cli import cron as cron_cli
|
||||
|
||||
monkeypatch.setattr("hermes_cli.gateway.find_gateway_pids", lambda: [4321])
|
||||
monkeypatch.setattr(jobs, "get_ticker_heartbeat_age", lambda: 5.0)
|
||||
monkeypatch.setattr(jobs, "get_ticker_success_age", lambda: 5.0)
|
||||
monkeypatch.setattr("cron.jobs.list_jobs", lambda **k: [])
|
||||
|
||||
cron_cli.cron_status()
|
||||
out = capsys.readouterr().out
|
||||
assert "will fire automatically" in out
|
||||
|
||||
|
||||
def test_cron_status_reports_stalled_when_no_heartbeat(tmp_path, monkeypatch, capsys):
|
||||
import cron.jobs as jobs
|
||||
from hermes_cli import cron as cron_cli
|
||||
|
||||
monkeypatch.setattr("hermes_cli.gateway.find_gateway_pids", lambda: [4321])
|
||||
monkeypatch.setattr(jobs, "get_ticker_heartbeat_age", lambda: 9_999.0) # dead
|
||||
monkeypatch.setattr(jobs, "get_ticker_success_age", lambda: 9_999.0)
|
||||
monkeypatch.setattr("cron.jobs.list_jobs", lambda **k: [])
|
||||
|
||||
cron_cli.cron_status()
|
||||
out = capsys.readouterr().out
|
||||
assert "STALLED" in out
|
||||
assert "will fire automatically" not in out
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue