From abbd8646eb511833500377799f5853d8d4eda5a2 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 18 Jun 2026 14:14:53 +1000 Subject: [PATCH] feat(gateway,desktop): start cron via resolved CronScheduler provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 — rebind both ticker call sites to resolve_cron_scheduler(). Default (built-in) path is byte-identical; Phase 0 characterization tests + the full gateway suite (6919) stay green. Task 3.1: split gateway/run.py _start_cron_ticker into: - _start_gateway_housekeeping() — the gateway-only chores (channel-dir refresh, image/doc cache cleanup, paste sweep, curator poll), now on their own loop/thread, independent of which cron provider is active. - _start_cron_ticker() — kept as a DEPRECATED shim that runs only the built-in InProcessCronScheduler().start(), preserving the symbol for hermes_cli/debug.py and the Phase 0 characterization test. Task 3.2: start_gateway() resolves the provider and runs provider.start() in the 'cron-scheduler' thread, plus a second 'gateway-housekeeping' thread; teardown sets the shared cron_stop, calls provider.stop(), joins both. Task 3.3: desktop _start_desktop_cron_ticker() swapped its inline tick loop for resolve_cron_scheduler().start() (no adapters/loop — desktop has none). The provider owns ONLY the cron tick (so an external scale-to-zero provider with no 60s loop fits); gateway housekeeping is decoupled from the cron trigger. Both threads share cron_stop. Verified: full tests/cron/ (453) + full tests/gateway/ (6919) green. Manual gateway smoke (Task 3.4) is operator-run, pending. --- gateway/run.py | 87 +++++++++++++++++++++++++++------------- hermes_cli/web_server.py | 25 +++++------- 2 files changed, 70 insertions(+), 42 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 4b41cfc6aec..2f5900e92f5 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -16454,21 +16454,20 @@ def _run_planned_stop_watcher( stop_event.wait(poll_interval) -def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): - """ - Background thread that ticks the cron scheduler at a regular interval. - - Runs inside the gateway process so cronjobs fire automatically without - needing a separate `hermes cron daemon` or system cron entry. +def _start_gateway_housekeeping(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): + """Background thread for gateway-only periodic chores (NOT cron). - When ``adapters`` and ``loop`` are provided, passes them through to the - cron delivery path so live adapters can be used for E2EE rooms. + Split out of the historical ``_start_cron_ticker`` so the cron *trigger* + can live behind the ``CronScheduler`` provider (built-in or external) while + these gateway-specific chores keep running independently of which provider + fires cron. An external scale-to-zero provider has no 60s loop at all, but + this housekeeping still wants its hourly cadence — so it owns its own loop. - Also refreshes the channel directory every 5 minutes and prunes the - image/audio/document cache + expired ``hermes debug share`` pastes - once per hour. + Refreshes the channel directory every 5 minutes and prunes the + image/audio/document cache + expired ``hermes debug share`` pastes once per + hour, and polls the curator hourly (its inner gate enforces the real + weekly cadence). """ - from cron.scheduler import tick as cron_tick from gateway.platforms.base import cleanup_image_cache, cleanup_document_cache from hermes_cli.debug import _sweep_expired_pastes @@ -16477,14 +16476,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in PASTE_SWEEP_EVERY = 60 # ticks — once per hour CURATOR_EVERY = 60 # ticks — poll hourly (inner gate handles the real cadence) - logger.info("Cron ticker started (interval=%ds)", interval) + logger.info("Gateway housekeeping started (interval=%ds)", interval) tick_count = 0 while not stop_event.is_set(): - try: - cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False) - except Exception as e: - logger.debug("Cron tick error: %s", e) - tick_count += 1 if tick_count % CHANNEL_DIR_EVERY == 0 and adapters: @@ -16492,9 +16486,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in from gateway.channel_directory import build_channel_directory if loop is not None: # build_channel_directory is async (Slack web calls), and - # this ticker runs in a background thread. Schedule onto - # the gateway event loop and wait briefly for completion - # so refresh failures are still logged via the except. + # this runs in a background thread. Schedule onto the + # gateway event loop and wait briefly for completion so + # refresh failures are still logged via the except. fut = safe_schedule_threadsafe( build_channel_directory(adapters), loop, logger=logger, @@ -16530,7 +16524,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in except Exception as e: logger.debug("Paste sweep error: %s", e) - # Curator — piggy-back on the existing cron ticker so long-running + # Curator — piggy-back on the housekeeping loop so long-running # gateways get weekly skill maintenance without needing restarts. # maybe_run_curator() is internally gated by config.interval_hours # (7 days by default), so CURATOR_EVERY is just the poll rate — the @@ -16546,7 +16540,22 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in logger.debug("Curator tick error: %s", e) stop_event.wait(timeout=interval) - logger.info("Cron ticker stopped") + logger.info("Gateway housekeeping stopped") + + +def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): + """DEPRECATED shim — preserved for backward compatibility. + + The cron trigger now lives behind the ``CronScheduler`` provider + (``cron.scheduler_provider``); the gateway resolves a provider and runs its + ``start()`` directly (see ``start_gateway``). This shim runs ONLY the + built-in in-process tick loop, exactly as before, for any external caller + or test that still references this symbol (e.g. hermes_cli/debug.py). It no + longer runs gateway housekeeping — that moved to + ``_start_gateway_housekeeping``. + """ + from cron.scheduler_provider import InProcessCronScheduler + InProcessCronScheduler().start(stop_event, adapters=adapters, loop=loop, interval=interval) async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = False, verbosity: Optional[int] = 0) -> bool: @@ -16942,17 +16951,34 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = logger.error("Gateway exiting cleanly: %s", runner.exit_reason) return True - # Start background cron ticker so scheduled jobs fire automatically. - # Pass the event loop so cron delivery can use live adapters (E2EE support). + # Start the background cron scheduler via the resolved provider so + # scheduled jobs fire automatically. The built-in provider is the + # historical in-process 60s ticker; an external provider (e.g. chronos) + # may arm a schedule and return. Pass the event loop so cron delivery can + # use live adapters (E2EE support). + from cron.scheduler_provider import resolve_cron_scheduler cron_stop = threading.Event() + cron_provider = resolve_cron_scheduler() cron_thread = threading.Thread( - target=_start_cron_ticker, + target=cron_provider.start, args=(cron_stop,), kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()}, daemon=True, - name="cron-ticker", + name="cron-scheduler", ) cron_thread.start() + + # Gateway-only periodic housekeeping (channel dir, cache cleanup, paste + # sweep, curator) — runs independently of which cron provider is active. + # Shares cron_stop as the shutdown signal. + housekeeping_thread = threading.Thread( + target=_start_gateway_housekeeping, + args=(cron_stop,), + kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()}, + daemon=True, + name="gateway-housekeeping", + ) + housekeeping_thread.start() # Wait for shutdown await runner.wait_for_shutdown() @@ -16962,9 +16988,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = logger.error("Gateway exiting with failure: %s", runner.exit_reason) return False - # Stop cron ticker cleanly + # Stop cron scheduler + housekeeping cleanly cron_stop.set() + try: + cron_provider.stop() + except Exception as e: + logger.debug("Cron provider stop() error: %s", e) cron_thread.join(timeout=5) + housekeeping_thread.join(timeout=5) # Stop the planned-stop watcher (daemon=True so this is belt-and-suspenders). _planned_stop_watcher_stop.set() diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 70f39162cf8..768084eba36 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -113,23 +113,20 @@ def _start_desktop_cron_ticker(stop_event: "threading.Event", interval: int = 60 The scheduler tick loop normally lives in ``hermes gateway run`` — but the desktop app spawns a ``hermes dashboard`` backend, not a gateway, so a cron - a user creates in the app would never fire. We run a minimal ticker here - (no live adapters; delivery falls back to the per-platform send path). + a user creates in the app would never fire. We run the resolved cron + scheduler provider here (no live adapters; delivery falls back to the + per-platform send path). - Cross-process safe: ``cron.scheduler.tick`` takes the ``cron/.tick.lock`` - file lock, so this never double-fires alongside a real gateway on the same - HERMES_HOME — whichever process grabs the lock first wins the tick. + Cross-process safe: the built-in provider's ``cron.scheduler.tick`` takes + the ``cron/.tick.lock`` file lock, so this never double-fires alongside a + real gateway on the same HERMES_HOME — whichever process grabs the lock + first wins the tick. """ - from cron.scheduler import tick as cron_tick + from cron.scheduler_provider import resolve_cron_scheduler - _log.info("Desktop cron ticker started (interval=%ds)", interval) - # Tick once up front (catches jobs due at launch), then on the interval. - while not stop_event.is_set(): - try: - cron_tick(verbose=False, sync=False) - except Exception as e: - _log.debug("Desktop cron tick error: %s", e) - stop_event.wait(interval) + provider = resolve_cron_scheduler() + _log.info("Desktop cron scheduler started (provider=%s, interval=%ds)", provider.name, interval) + provider.start(stop_event, interval=interval) @asynccontextmanager