feat(gateway,desktop): start cron via resolved CronScheduler provider

Phase 3 — rebind both ticker call sites to resolve_cron_scheduler(). Default
(built-in) path is byte-identical; Phase 0 characterization tests + the full
gateway suite (6919) stay green.

Task 3.1: split gateway/run.py _start_cron_ticker into:
  - _start_gateway_housekeeping() — the gateway-only chores (channel-dir
    refresh, image/doc cache cleanup, paste sweep, curator poll), now on their
    own loop/thread, independent of which cron provider is active.
  - _start_cron_ticker() — kept as a DEPRECATED shim that runs only the
    built-in InProcessCronScheduler().start(), preserving the symbol for
    hermes_cli/debug.py and the Phase 0 characterization test.
Task 3.2: start_gateway() resolves the provider and runs provider.start() in
  the 'cron-scheduler' thread, plus a second 'gateway-housekeeping' thread;
  teardown sets the shared cron_stop, calls provider.stop(), joins both.
Task 3.3: desktop _start_desktop_cron_ticker() swapped its inline tick loop for
  resolve_cron_scheduler().start() (no adapters/loop — desktop has none).

The provider owns ONLY the cron tick (so an external scale-to-zero provider
with no 60s loop fits); gateway housekeeping is decoupled from the cron
trigger. Both threads share cron_stop.

Verified: full tests/cron/ (453) + full tests/gateway/ (6919) green. Manual
gateway smoke (Task 3.4) is operator-run, pending.
This commit is contained in:
Ben 2026-06-18 14:14:53 +10:00
parent ae8fa11097
commit abbd8646eb
2 changed files with 70 additions and 42 deletions

View file

@ -16454,21 +16454,20 @@ def _run_planned_stop_watcher(
stop_event.wait(poll_interval)
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
"""
Background thread that ticks the cron scheduler at a regular interval.
Runs inside the gateway process so cronjobs fire automatically without
needing a separate `hermes cron daemon` or system cron entry.
def _start_gateway_housekeeping(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
"""Background thread for gateway-only periodic chores (NOT cron).
When ``adapters`` and ``loop`` are provided, passes them through to the
cron delivery path so live adapters can be used for E2EE rooms.
Split out of the historical ``_start_cron_ticker`` so the cron *trigger*
can live behind the ``CronScheduler`` provider (built-in or external) while
these gateway-specific chores keep running independently of which provider
fires cron. An external scale-to-zero provider has no 60s loop at all, but
this housekeeping still wants its hourly cadence so it owns its own loop.
Also refreshes the channel directory every 5 minutes and prunes the
image/audio/document cache + expired ``hermes debug share`` pastes
once per hour.
Refreshes the channel directory every 5 minutes and prunes the
image/audio/document cache + expired ``hermes debug share`` pastes once per
hour, and polls the curator hourly (its inner gate enforces the real
weekly cadence).
"""
from cron.scheduler import tick as cron_tick
from gateway.platforms.base import cleanup_image_cache, cleanup_document_cache
from hermes_cli.debug import _sweep_expired_pastes
@ -16477,14 +16476,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
PASTE_SWEEP_EVERY = 60 # ticks — once per hour
CURATOR_EVERY = 60 # ticks — poll hourly (inner gate handles the real cadence)
logger.info("Cron ticker started (interval=%ds)", interval)
logger.info("Gateway housekeeping started (interval=%ds)", interval)
tick_count = 0
while not stop_event.is_set():
try:
cron_tick(verbose=False, adapters=adapters, loop=loop, sync=False)
except Exception as e:
logger.debug("Cron tick error: %s", e)
tick_count += 1
if tick_count % CHANNEL_DIR_EVERY == 0 and adapters:
@ -16492,9 +16486,9 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
from gateway.channel_directory import build_channel_directory
if loop is not None:
# build_channel_directory is async (Slack web calls), and
# this ticker runs in a background thread. Schedule onto
# the gateway event loop and wait briefly for completion
# so refresh failures are still logged via the except.
# this runs in a background thread. Schedule onto the
# gateway event loop and wait briefly for completion so
# refresh failures are still logged via the except.
fut = safe_schedule_threadsafe(
build_channel_directory(adapters), loop,
logger=logger,
@ -16530,7 +16524,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
except Exception as e:
logger.debug("Paste sweep error: %s", e)
# Curator — piggy-back on the existing cron ticker so long-running
# Curator — piggy-back on the housekeeping loop so long-running
# gateways get weekly skill maintenance without needing restarts.
# maybe_run_curator() is internally gated by config.interval_hours
# (7 days by default), so CURATOR_EVERY is just the poll rate — the
@ -16546,7 +16540,22 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
logger.debug("Curator tick error: %s", e)
stop_event.wait(timeout=interval)
logger.info("Cron ticker stopped")
logger.info("Gateway housekeeping stopped")
def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60):
"""DEPRECATED shim — preserved for backward compatibility.
The cron trigger now lives behind the ``CronScheduler`` provider
(``cron.scheduler_provider``); the gateway resolves a provider and runs its
``start()`` directly (see ``start_gateway``). This shim runs ONLY the
built-in in-process tick loop, exactly as before, for any external caller
or test that still references this symbol (e.g. hermes_cli/debug.py). It no
longer runs gateway housekeeping that moved to
``_start_gateway_housekeeping``.
"""
from cron.scheduler_provider import InProcessCronScheduler
InProcessCronScheduler().start(stop_event, adapters=adapters, loop=loop, interval=interval)
async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = False, verbosity: Optional[int] = 0) -> bool:
@ -16942,17 +16951,34 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
logger.error("Gateway exiting cleanly: %s", runner.exit_reason)
return True
# Start background cron ticker so scheduled jobs fire automatically.
# Pass the event loop so cron delivery can use live adapters (E2EE support).
# Start the background cron scheduler via the resolved provider so
# scheduled jobs fire automatically. The built-in provider is the
# historical in-process 60s ticker; an external provider (e.g. chronos)
# may arm a schedule and return. Pass the event loop so cron delivery can
# use live adapters (E2EE support).
from cron.scheduler_provider import resolve_cron_scheduler
cron_stop = threading.Event()
cron_provider = resolve_cron_scheduler()
cron_thread = threading.Thread(
target=_start_cron_ticker,
target=cron_provider.start,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
daemon=True,
name="cron-ticker",
name="cron-scheduler",
)
cron_thread.start()
# Gateway-only periodic housekeeping (channel dir, cache cleanup, paste
# sweep, curator) — runs independently of which cron provider is active.
# Shares cron_stop as the shutdown signal.
housekeeping_thread = threading.Thread(
target=_start_gateway_housekeeping,
args=(cron_stop,),
kwargs={"adapters": runner.adapters, "loop": asyncio.get_running_loop()},
daemon=True,
name="gateway-housekeeping",
)
housekeeping_thread.start()
# Wait for shutdown
await runner.wait_for_shutdown()
@ -16962,9 +16988,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
logger.error("Gateway exiting with failure: %s", runner.exit_reason)
return False
# Stop cron ticker cleanly
# Stop cron scheduler + housekeeping cleanly
cron_stop.set()
try:
cron_provider.stop()
except Exception as e:
logger.debug("Cron provider stop() error: %s", e)
cron_thread.join(timeout=5)
housekeeping_thread.join(timeout=5)
# Stop the planned-stop watcher (daemon=True so this is belt-and-suspenders).
_planned_stop_watcher_stop.set()

View file

@ -113,23 +113,20 @@ def _start_desktop_cron_ticker(stop_event: "threading.Event", interval: int = 60
The scheduler tick loop normally lives in ``hermes gateway run`` but the
desktop app spawns a ``hermes dashboard`` backend, not a gateway, so a cron
a user creates in the app would never fire. We run a minimal ticker here
(no live adapters; delivery falls back to the per-platform send path).
a user creates in the app would never fire. We run the resolved cron
scheduler provider here (no live adapters; delivery falls back to the
per-platform send path).
Cross-process safe: ``cron.scheduler.tick`` takes the ``cron/.tick.lock``
file lock, so this never double-fires alongside a real gateway on the same
HERMES_HOME whichever process grabs the lock first wins the tick.
Cross-process safe: the built-in provider's ``cron.scheduler.tick`` takes
the ``cron/.tick.lock`` file lock, so this never double-fires alongside a
real gateway on the same HERMES_HOME whichever process grabs the lock
first wins the tick.
"""
from cron.scheduler import tick as cron_tick
from cron.scheduler_provider import resolve_cron_scheduler
_log.info("Desktop cron ticker started (interval=%ds)", interval)
# Tick once up front (catches jobs due at launch), then on the interval.
while not stop_event.is_set():
try:
cron_tick(verbose=False, sync=False)
except Exception as e:
_log.debug("Desktop cron tick error: %s", e)
stop_event.wait(interval)
provider = resolve_cron_scheduler()
_log.info("Desktop cron scheduler started (provider=%s, interval=%ds)", provider.name, interval)
provider.start(stop_event, interval=interval)
@asynccontextmanager