From c86915024e23be0572990b4ce6809f2f8af1c677 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 20 Apr 2026 11:53:07 -0700 Subject: [PATCH] fix(cron): run due jobs in parallel to prevent serial tick starvation (#13021) Replaces the serial for-loop in tick() with ThreadPoolExecutor so all jobs due in a single tick run concurrently. A slow job no longer blocks others from executing, fixing silent job skipping (issue #9086). Thread safety: - Session/delivery env vars migrated from os.environ to ContextVars (gateway/session_context.py) so parallel jobs can't clobber each other's delivery targets. Each thread gets its own copied context. - jobs.json read-modify-write cycles (advance_next_run, mark_job_run) protected by threading.Lock to prevent concurrent save clobber. - send_message_tool reads delivery vars via get_session_env() for ContextVar-aware resolution with os.environ fallback. Configuration: - cron.max_parallel_jobs in config.yaml (null = unbounded, 1 = serial) - HERMES_CRON_MAX_PARALLEL env var override Based on PR #9169 by @VenomMoth1. Fixes #9086 --- cron/jobs.py | 100 ++++++++++++++------------- cron/scheduler.py | 90 ++++++++++++++++-------- gateway/session_context.py | 9 +++ hermes_cli/config.py | 5 ++ tests/cron/test_scheduler.py | 129 ++++++++++++++++++++++++++++++++++- tools/send_message_tool.py | 7 +- 6 files changed, 259 insertions(+), 81 deletions(-) diff --git a/cron/jobs.py b/cron/jobs.py index 06d782888..8fb3f868a 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -9,6 +9,7 @@ import copy import json import logging import tempfile +import threading import os import re import uuid @@ -34,6 +35,11 @@ except ImportError: HERMES_DIR = get_hermes_home().resolve() CRON_DIR = HERMES_DIR / "cron" JOBS_FILE = CRON_DIR / "jobs.json" + +# In-process lock protecting load_jobs→modify→save_jobs cycles. +# Required when tick() runs jobs in parallel threads — without this, +# concurrent mark_job_run / advance_next_run calls can clobber each other. +_jobs_file_lock = threading.Lock() OUTPUT_DIR = CRON_DIR / "output" ONESHOT_GRACE_SECONDS = 120 @@ -594,43 +600,44 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None, ``delivery_error`` is tracked separately from the agent error — a job can succeed (agent produced output) but fail delivery (platform down). """ - jobs = load_jobs() - for i, job in enumerate(jobs): - if job["id"] == job_id: - now = _hermes_now().isoformat() - job["last_run_at"] = now - job["last_status"] = "ok" if success else "error" - job["last_error"] = error if not success else None - # Track delivery failures separately — cleared on successful delivery - job["last_delivery_error"] = delivery_error - - # Increment completed count - if job.get("repeat"): - job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1 + with _jobs_file_lock: + jobs = load_jobs() + for i, job in enumerate(jobs): + if job["id"] == job_id: + now = _hermes_now().isoformat() + job["last_run_at"] = now + job["last_status"] = "ok" if success else "error" + job["last_error"] = error if not success else None + # Track delivery failures separately — cleared on successful delivery + job["last_delivery_error"] = delivery_error - # Check if we've hit the repeat limit - times = job["repeat"].get("times") - completed = job["repeat"]["completed"] - if times is not None and times > 0 and completed >= times: - # Remove the job (limit reached) - jobs.pop(i) - save_jobs(jobs) - return - - # Compute next run - job["next_run_at"] = compute_next_run(job["schedule"], now) + # Increment completed count + if job.get("repeat"): + job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1 + + # Check if we've hit the repeat limit + times = job["repeat"].get("times") + completed = job["repeat"]["completed"] + if times is not None and times > 0 and completed >= times: + # Remove the job (limit reached) + jobs.pop(i) + save_jobs(jobs) + return + + # Compute next run + job["next_run_at"] = compute_next_run(job["schedule"], now) - # If no next run (one-shot completed), disable - if job["next_run_at"] is None: - job["enabled"] = False - job["state"] = "completed" - elif job.get("state") != "paused": - job["state"] = "scheduled" + # If no next run (one-shot completed), disable + if job["next_run_at"] is None: + job["enabled"] = False + job["state"] = "completed" + elif job.get("state") != "paused": + job["state"] = "scheduled" - save_jobs(jobs) - return + save_jobs(jobs) + return - logger.warning("mark_job_run: job_id %s not found, skipping save", job_id) + logger.warning("mark_job_run: job_id %s not found, skipping save", job_id) def advance_next_run(job_id: str) -> bool: @@ -645,20 +652,21 @@ def advance_next_run(job_id: str) -> bool: Returns True if next_run_at was advanced, False otherwise. """ - jobs = load_jobs() - for job in jobs: - if job["id"] == job_id: - kind = job.get("schedule", {}).get("kind") - if kind not in ("cron", "interval"): + with _jobs_file_lock: + jobs = load_jobs() + for job in jobs: + if job["id"] == job_id: + kind = job.get("schedule", {}).get("kind") + if kind not in ("cron", "interval"): + return False + now = _hermes_now().isoformat() + new_next = compute_next_run(job["schedule"], now) + if new_next and new_next != job.get("next_run_at"): + job["next_run_at"] = new_next + save_jobs(jobs) + return True return False - now = _hermes_now().isoformat() - new_next = compute_next_run(job["schedule"], now) - if new_next and new_next != job.get("next_run_at"): - job["next_run_at"] = new_next - save_jobs(jobs) - return True - return False - return False + return False def get_due_jobs() -> List[Dict[str, Any]]: diff --git a/cron/scheduler.py b/cron/scheduler.py index ebeb29dd4..4b131859b 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -747,14 +747,17 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: # scheduler process — every job this process runs is a cron job. os.environ["HERMES_CRON_SESSION"] = "1" + # Use ContextVars for per-job session/delivery state so parallel jobs + # don't clobber each other's targets (os.environ is process-global). + from gateway.session_context import set_session_vars, clear_session_vars, _VAR_MAP + + _ctx_tokens = set_session_vars( + platform=origin["platform"] if origin else "", + chat_id=str(origin["chat_id"]) if origin else "", + chat_name=origin.get("chat_name", "") if origin else "", + ) + try: - # Inject origin context so the agent's send_message tool knows the chat. - # Must be INSIDE the try block so the finally cleanup always runs. - if origin: - os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"] - os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"]) - if origin.get("chat_name"): - os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"] # Re-read .env and config.yaml fresh every run so provider/key # changes take effect without a gateway restart. from dotenv import load_dotenv @@ -765,10 +768,10 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: delivery_target = _resolve_delivery_target(job) if delivery_target: - os.environ["HERMES_CRON_AUTO_DELIVER_PLATFORM"] = delivery_target["platform"] - os.environ["HERMES_CRON_AUTO_DELIVER_CHAT_ID"] = str(delivery_target["chat_id"]) + _VAR_MAP["HERMES_CRON_AUTO_DELIVER_PLATFORM"].set(delivery_target["platform"]) + _VAR_MAP["HERMES_CRON_AUTO_DELIVER_CHAT_ID"].set(str(delivery_target["chat_id"])) if delivery_target.get("thread_id") is not None: - os.environ["HERMES_CRON_AUTO_DELIVER_THREAD_ID"] = str(delivery_target["thread_id"]) + _VAR_MAP["HERMES_CRON_AUTO_DELIVER_THREAD_ID"].set(str(delivery_target["thread_id"])) model = job.get("model") or os.getenv("HERMES_MODEL") or "" @@ -1012,16 +1015,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: return False, output, "", error_msg finally: - # Clean up injected env vars so they don't leak to other jobs - for key in ( - "HERMES_SESSION_PLATFORM", - "HERMES_SESSION_CHAT_ID", - "HERMES_SESSION_CHAT_NAME", - "HERMES_CRON_AUTO_DELIVER_PLATFORM", - "HERMES_CRON_AUTO_DELIVER_CHAT_ID", - "HERMES_CRON_AUTO_DELIVER_THREAD_ID", - ): - os.environ.pop(key, None) + # Clean up ContextVar session/delivery state for this job. + clear_session_vars(_ctx_tokens) if _session_db: try: _session_db.end_session(_cron_session_id, "cron_complete") @@ -1074,15 +1069,42 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int: if verbose: logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs)) - executed = 0 + # Advance next_run_at for all recurring jobs FIRST, under the file lock, + # before any execution begins. This preserves at-most-once semantics. for job in due_jobs: - try: - # For recurring jobs (cron/interval), advance next_run_at to the - # next future occurrence BEFORE execution. This way, if the - # process crashes mid-run, the job won't re-fire on restart. - # One-shot jobs are left alone so they can retry on restart. - advance_next_run(job["id"]) + advance_next_run(job["id"]) + # Resolve max parallel workers: env var > config.yaml > unbounded. + # Set HERMES_CRON_MAX_PARALLEL=1 to restore old serial behaviour. + _max_workers: Optional[int] = None + try: + _env_par = os.getenv("HERMES_CRON_MAX_PARALLEL", "").strip() + if _env_par: + _max_workers = int(_env_par) or None + except (ValueError, TypeError): + logger.warning("Invalid HERMES_CRON_MAX_PARALLEL value; defaulting to unbounded") + if _max_workers is None: + try: + from hermes_cli.config import load_config + _ucfg = load_config() or {} + _cfg_par = ( + _ucfg.get("cron", {}) if isinstance(_ucfg, dict) else {} + ).get("max_parallel_jobs") + if _cfg_par is not None: + _max_workers = int(_cfg_par) or None + except Exception: + pass + + if verbose: + logger.info( + "Running %d job(s) in parallel (max_workers=%s)", + len(due_jobs), + _max_workers if _max_workers else "unbounded", + ) + + def _process_job(job: dict) -> bool: + """Run one due job end-to-end: execute, save, deliver, mark.""" + try: success, output, final_response, error = run_job(job) output_file = save_job_output(job["id"], output) @@ -1114,13 +1136,23 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int: error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" mark_job_run(job["id"], success, error, delivery_error=delivery_error) - executed += 1 + return True except Exception as e: logger.error("Error processing job %s: %s", job['id'], e) mark_job_run(job["id"], False, str(e)) + return False - return executed + # Run all due jobs concurrently, each in its own ContextVar copy + # so session/delivery state stays isolated per-thread. + with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as _tick_pool: + _futures = [] + for job in due_jobs: + _ctx = contextvars.copy_context() + _futures.append(_tick_pool.submit(_ctx.run, _process_job, job)) + _results = [f.result() for f in _futures] + + return sum(_results) finally: if fcntl: fcntl.flock(lock_fd, fcntl.LOCK_UN) diff --git a/gateway/session_context.py b/gateway/session_context.py index 7f8aca3eb..9dc051e3a 100644 --- a/gateway/session_context.py +++ b/gateway/session_context.py @@ -56,6 +56,12 @@ _SESSION_USER_ID: ContextVar = ContextVar("HERMES_SESSION_USER_ID", default=_UNS _SESSION_USER_NAME: ContextVar = ContextVar("HERMES_SESSION_USER_NAME", default=_UNSET) _SESSION_KEY: ContextVar = ContextVar("HERMES_SESSION_KEY", default=_UNSET) +# Cron auto-delivery vars — set per-job in run_job() so concurrent jobs +# don't clobber each other's delivery targets. +_CRON_AUTO_DELIVER_PLATFORM: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_PLATFORM", default=_UNSET) +_CRON_AUTO_DELIVER_CHAT_ID: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_CHAT_ID", default=_UNSET) +_CRON_AUTO_DELIVER_THREAD_ID: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_THREAD_ID", default=_UNSET) + _VAR_MAP = { "HERMES_SESSION_PLATFORM": _SESSION_PLATFORM, "HERMES_SESSION_CHAT_ID": _SESSION_CHAT_ID, @@ -64,6 +70,9 @@ _VAR_MAP = { "HERMES_SESSION_USER_ID": _SESSION_USER_ID, "HERMES_SESSION_USER_NAME": _SESSION_USER_NAME, "HERMES_SESSION_KEY": _SESSION_KEY, + "HERMES_CRON_AUTO_DELIVER_PLATFORM": _CRON_AUTO_DELIVER_PLATFORM, + "HERMES_CRON_AUTO_DELIVER_CHAT_ID": _CRON_AUTO_DELIVER_CHAT_ID, + "HERMES_CRON_AUTO_DELIVER_THREAD_ID": _CRON_AUTO_DELIVER_THREAD_ID, } diff --git a/hermes_cli/config.py b/hermes_cli/config.py index ef5e3d2fc..c046d2b28 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -796,6 +796,11 @@ DEFAULT_CONFIG = { # Wrap delivered cron responses with a header (task name) and footer # ("The agent cannot see this message"). Set to false for clean output. "wrap_response": True, + # Maximum number of due jobs to run in parallel per tick. + # null/0 = unbounded (limited only by thread count). + # 1 = serial (pre-v0.9 behaviour). + # Also overridable via HERMES_CRON_MAX_PARALLEL env var. + "max_parallel_jobs": None, }, # execute_code settings — controls the tool used for programmatic tool calls. diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index b7bcbc9b4..e862638ee 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -772,9 +772,10 @@ class TestRunJobSessionPersistence: pass def run_conversation(self, *args, **kwargs): - seen["platform"] = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM") - seen["chat_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID") - seen["thread_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID") + from gateway.session_context import get_session_env + seen["platform"] = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM") or None + seen["chat_id"] = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID") or None + seen["thread_id"] = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID") or None return {"final_response": "ok"} with patch("cron.scheduler._hermes_home", tmp_path), \ @@ -1457,3 +1458,125 @@ class TestSendMediaViaAdapter: self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"}) adapter.send_voice.assert_called_once() adapter.send_image_file.assert_called_once() + + +class TestParallelTick: + """Verify that tick() runs due jobs concurrently and isolates ContextVars.""" + + @pytest.fixture(autouse=True) + def _isolate_tick_lock(self, tmp_path): + """Point the tick file lock at a per-test temp dir to avoid xdist contention.""" + lock_dir = tmp_path / "cron" + lock_dir.mkdir() + with patch("cron.scheduler._LOCK_DIR", lock_dir), \ + patch("cron.scheduler._LOCK_FILE", lock_dir / ".tick.lock"): + yield + + def test_parallel_jobs_run_concurrently(self): + """Two jobs launched in the same tick should overlap in time.""" + import threading + import time + + barrier = threading.Barrier(2, timeout=5) + call_order = [] + + def mock_run_job(job): + """Each job hits a barrier — both must be active simultaneously.""" + call_order.append(("start", job["id"])) + barrier.wait() # blocks until both threads reach here + call_order.append(("end", job["id"])) + return (True, "output", "response", None) + + jobs = [ + {"id": "job-a", "name": "a", "deliver": "local"}, + {"id": "job-b", "name": "b", "deliver": "local"}, + ] + + with patch("cron.scheduler.get_due_jobs", return_value=jobs), \ + patch("cron.scheduler.advance_next_run"), \ + patch("cron.scheduler.run_job", side_effect=mock_run_job), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result", return_value=None), \ + patch("cron.scheduler.mark_job_run"): + from cron.scheduler import tick + result = tick(verbose=False) + + assert result == 2 + # Both starts happened before both ends — proof of concurrency + starts = [i for i, (action, _) in enumerate(call_order) if action == "start"] + ends = [i for i, (action, _) in enumerate(call_order) if action == "end"] + assert len(starts) == 2 + assert len(ends) == 2 + assert max(starts) < min(ends), f"Jobs not concurrent: {call_order}" + + def test_parallel_jobs_isolated_contextvars(self): + """Each job's ContextVars must be isolated — no cross-contamination.""" + from gateway.session_context import get_session_env + seen = {} + + def mock_run_job(job): + origin = job.get("origin", {}) + # run_job sets ContextVars — verify each job sees its own + from gateway.session_context import set_session_vars, clear_session_vars + tokens = set_session_vars( + platform=origin.get("platform", ""), + chat_id=str(origin.get("chat_id", "")), + ) + import time + time.sleep(0.05) # give other thread time to set its vars + platform = get_session_env("HERMES_SESSION_PLATFORM") + chat_id = get_session_env("HERMES_SESSION_CHAT_ID") + seen[job["id"]] = {"platform": platform, "chat_id": chat_id} + clear_session_vars(tokens) + return (True, "output", "response", None) + + jobs = [ + {"id": "tg-job", "name": "tg", "deliver": "local", + "origin": {"platform": "telegram", "chat_id": "111"}}, + {"id": "dc-job", "name": "dc", "deliver": "local", + "origin": {"platform": "discord", "chat_id": "222"}}, + ] + + with patch("cron.scheduler.get_due_jobs", return_value=jobs), \ + patch("cron.scheduler.advance_next_run"), \ + patch("cron.scheduler.run_job", side_effect=mock_run_job), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result", return_value=None), \ + patch("cron.scheduler.mark_job_run"): + from cron.scheduler import tick + tick(verbose=False) + + assert seen["tg-job"] == {"platform": "telegram", "chat_id": "111"} + assert seen["dc-job"] == {"platform": "discord", "chat_id": "222"} + + def test_max_parallel_env_var(self, monkeypatch): + """HERMES_CRON_MAX_PARALLEL=1 should restore serial behaviour.""" + monkeypatch.setenv("HERMES_CRON_MAX_PARALLEL", "1") + call_times = [] + + def mock_run_job(job): + import time + call_times.append(("start", job["id"], time.monotonic())) + time.sleep(0.05) + call_times.append(("end", job["id"], time.monotonic())) + return (True, "output", "response", None) + + jobs = [ + {"id": "s1", "name": "s1", "deliver": "local"}, + {"id": "s2", "name": "s2", "deliver": "local"}, + ] + + with patch("cron.scheduler.get_due_jobs", return_value=jobs), \ + patch("cron.scheduler.advance_next_run"), \ + patch("cron.scheduler.run_job", side_effect=mock_run_job), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result", return_value=None), \ + patch("cron.scheduler.mark_job_run"): + from cron.scheduler import tick + result = tick(verbose=False) + + assert result == 2 + # With max_workers=1, second job starts after first ends + end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0] + start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0] + assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1" diff --git a/tools/send_message_tool.py b/tools/send_message_tool.py index 534426607..1a344c534 100644 --- a/tools/send_message_tool.py +++ b/tools/send_message_tool.py @@ -359,11 +359,12 @@ def _describe_media_for_mirror(media_files): def _get_cron_auto_delivery_target(): """Return the cron scheduler's auto-delivery target for the current run, if any.""" - platform = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower() - chat_id = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip() + from gateway.session_context import get_session_env + platform = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower() + chat_id = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip() if not platform or not chat_id: return None - thread_id = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None + thread_id = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None return { "platform": platform, "chat_id": chat_id,