fix(cron): run due jobs in parallel to prevent serial tick starvation (#13021)

Replaces the serial for-loop in tick() with ThreadPoolExecutor so all
jobs due in a single tick run concurrently. A slow job no longer blocks
others from executing, fixing silent job skipping (issue #9086).

Thread safety:
- Session/delivery env vars migrated from os.environ to ContextVars
  (gateway/session_context.py) so parallel jobs can't clobber each
  other's delivery targets. Each thread gets its own copied context.
- jobs.json read-modify-write cycles (advance_next_run, mark_job_run)
  protected by threading.Lock to prevent concurrent save clobber.
- send_message_tool reads delivery vars via get_session_env() for
  ContextVar-aware resolution with os.environ fallback.

Configuration:
- cron.max_parallel_jobs in config.yaml (null = unbounded, 1 = serial)
- HERMES_CRON_MAX_PARALLEL env var override

Based on PR #9169 by @VenomMoth1.

Fixes #9086
This commit is contained in:
Teknium 2026-04-20 11:53:07 -07:00 committed by GitHub
parent d587d62eba
commit c86915024e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 259 additions and 81 deletions

View file

@ -9,6 +9,7 @@ import copy
import json import json
import logging import logging
import tempfile import tempfile
import threading
import os import os
import re import re
import uuid import uuid
@ -34,6 +35,11 @@ except ImportError:
HERMES_DIR = get_hermes_home().resolve() HERMES_DIR = get_hermes_home().resolve()
CRON_DIR = HERMES_DIR / "cron" CRON_DIR = HERMES_DIR / "cron"
JOBS_FILE = CRON_DIR / "jobs.json" JOBS_FILE = CRON_DIR / "jobs.json"
# In-process lock protecting load_jobs→modify→save_jobs cycles.
# Required when tick() runs jobs in parallel threads — without this,
# concurrent mark_job_run / advance_next_run calls can clobber each other.
_jobs_file_lock = threading.Lock()
OUTPUT_DIR = CRON_DIR / "output" OUTPUT_DIR = CRON_DIR / "output"
ONESHOT_GRACE_SECONDS = 120 ONESHOT_GRACE_SECONDS = 120
@ -594,6 +600,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None,
``delivery_error`` is tracked separately from the agent error a job ``delivery_error`` is tracked separately from the agent error a job
can succeed (agent produced output) but fail delivery (platform down). can succeed (agent produced output) but fail delivery (platform down).
""" """
with _jobs_file_lock:
jobs = load_jobs() jobs = load_jobs()
for i, job in enumerate(jobs): for i, job in enumerate(jobs):
if job["id"] == job_id: if job["id"] == job_id:
@ -645,6 +652,7 @@ def advance_next_run(job_id: str) -> bool:
Returns True if next_run_at was advanced, False otherwise. Returns True if next_run_at was advanced, False otherwise.
""" """
with _jobs_file_lock:
jobs = load_jobs() jobs = load_jobs()
for job in jobs: for job in jobs:
if job["id"] == job_id: if job["id"] == job_id:

View file

@ -747,14 +747,17 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# scheduler process — every job this process runs is a cron job. # scheduler process — every job this process runs is a cron job.
os.environ["HERMES_CRON_SESSION"] = "1" os.environ["HERMES_CRON_SESSION"] = "1"
# Use ContextVars for per-job session/delivery state so parallel jobs
# don't clobber each other's targets (os.environ is process-global).
from gateway.session_context import set_session_vars, clear_session_vars, _VAR_MAP
_ctx_tokens = set_session_vars(
platform=origin["platform"] if origin else "",
chat_id=str(origin["chat_id"]) if origin else "",
chat_name=origin.get("chat_name", "") if origin else "",
)
try: try:
# Inject origin context so the agent's send_message tool knows the chat.
# Must be INSIDE the try block so the finally cleanup always runs.
if origin:
os.environ["HERMES_SESSION_PLATFORM"] = origin["platform"]
os.environ["HERMES_SESSION_CHAT_ID"] = str(origin["chat_id"])
if origin.get("chat_name"):
os.environ["HERMES_SESSION_CHAT_NAME"] = origin["chat_name"]
# Re-read .env and config.yaml fresh every run so provider/key # Re-read .env and config.yaml fresh every run so provider/key
# changes take effect without a gateway restart. # changes take effect without a gateway restart.
from dotenv import load_dotenv from dotenv import load_dotenv
@ -765,10 +768,10 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
delivery_target = _resolve_delivery_target(job) delivery_target = _resolve_delivery_target(job)
if delivery_target: if delivery_target:
os.environ["HERMES_CRON_AUTO_DELIVER_PLATFORM"] = delivery_target["platform"] _VAR_MAP["HERMES_CRON_AUTO_DELIVER_PLATFORM"].set(delivery_target["platform"])
os.environ["HERMES_CRON_AUTO_DELIVER_CHAT_ID"] = str(delivery_target["chat_id"]) _VAR_MAP["HERMES_CRON_AUTO_DELIVER_CHAT_ID"].set(str(delivery_target["chat_id"]))
if delivery_target.get("thread_id") is not None: if delivery_target.get("thread_id") is not None:
os.environ["HERMES_CRON_AUTO_DELIVER_THREAD_ID"] = str(delivery_target["thread_id"]) _VAR_MAP["HERMES_CRON_AUTO_DELIVER_THREAD_ID"].set(str(delivery_target["thread_id"]))
model = job.get("model") or os.getenv("HERMES_MODEL") or "" model = job.get("model") or os.getenv("HERMES_MODEL") or ""
@ -1012,16 +1015,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
return False, output, "", error_msg return False, output, "", error_msg
finally: finally:
# Clean up injected env vars so they don't leak to other jobs # Clean up ContextVar session/delivery state for this job.
for key in ( clear_session_vars(_ctx_tokens)
"HERMES_SESSION_PLATFORM",
"HERMES_SESSION_CHAT_ID",
"HERMES_SESSION_CHAT_NAME",
"HERMES_CRON_AUTO_DELIVER_PLATFORM",
"HERMES_CRON_AUTO_DELIVER_CHAT_ID",
"HERMES_CRON_AUTO_DELIVER_THREAD_ID",
):
os.environ.pop(key, None)
if _session_db: if _session_db:
try: try:
_session_db.end_session(_cron_session_id, "cron_complete") _session_db.end_session(_cron_session_id, "cron_complete")
@ -1074,15 +1069,42 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose: if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs)) logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
executed = 0 # Advance next_run_at for all recurring jobs FIRST, under the file lock,
# before any execution begins. This preserves at-most-once semantics.
for job in due_jobs: for job in due_jobs:
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"]) advance_next_run(job["id"])
# Resolve max parallel workers: env var > config.yaml > unbounded.
# Set HERMES_CRON_MAX_PARALLEL=1 to restore old serial behaviour.
_max_workers: Optional[int] = None
try:
_env_par = os.getenv("HERMES_CRON_MAX_PARALLEL", "").strip()
if _env_par:
_max_workers = int(_env_par) or None
except (ValueError, TypeError):
logger.warning("Invalid HERMES_CRON_MAX_PARALLEL value; defaulting to unbounded")
if _max_workers is None:
try:
from hermes_cli.config import load_config
_ucfg = load_config() or {}
_cfg_par = (
_ucfg.get("cron", {}) if isinstance(_ucfg, dict) else {}
).get("max_parallel_jobs")
if _cfg_par is not None:
_max_workers = int(_cfg_par) or None
except Exception:
pass
if verbose:
logger.info(
"Running %d job(s) in parallel (max_workers=%s)",
len(due_jobs),
_max_workers if _max_workers else "unbounded",
)
def _process_job(job: dict) -> bool:
"""Run one due job end-to-end: execute, save, deliver, mark."""
try:
success, output, final_response, error = run_job(job) success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output) output_file = save_job_output(job["id"], output)
@ -1114,13 +1136,23 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
mark_job_run(job["id"], success, error, delivery_error=delivery_error) mark_job_run(job["id"], success, error, delivery_error=delivery_error)
executed += 1 return True
except Exception as e: except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e) logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e)) mark_job_run(job["id"], False, str(e))
return False
return executed # Run all due jobs concurrently, each in its own ContextVar copy
# so session/delivery state stays isolated per-thread.
with concurrent.futures.ThreadPoolExecutor(max_workers=_max_workers) as _tick_pool:
_futures = []
for job in due_jobs:
_ctx = contextvars.copy_context()
_futures.append(_tick_pool.submit(_ctx.run, _process_job, job))
_results = [f.result() for f in _futures]
return sum(_results)
finally: finally:
if fcntl: if fcntl:
fcntl.flock(lock_fd, fcntl.LOCK_UN) fcntl.flock(lock_fd, fcntl.LOCK_UN)

View file

@ -56,6 +56,12 @@ _SESSION_USER_ID: ContextVar = ContextVar("HERMES_SESSION_USER_ID", default=_UNS
_SESSION_USER_NAME: ContextVar = ContextVar("HERMES_SESSION_USER_NAME", default=_UNSET) _SESSION_USER_NAME: ContextVar = ContextVar("HERMES_SESSION_USER_NAME", default=_UNSET)
_SESSION_KEY: ContextVar = ContextVar("HERMES_SESSION_KEY", default=_UNSET) _SESSION_KEY: ContextVar = ContextVar("HERMES_SESSION_KEY", default=_UNSET)
# Cron auto-delivery vars — set per-job in run_job() so concurrent jobs
# don't clobber each other's delivery targets.
_CRON_AUTO_DELIVER_PLATFORM: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_PLATFORM", default=_UNSET)
_CRON_AUTO_DELIVER_CHAT_ID: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_CHAT_ID", default=_UNSET)
_CRON_AUTO_DELIVER_THREAD_ID: ContextVar = ContextVar("HERMES_CRON_AUTO_DELIVER_THREAD_ID", default=_UNSET)
_VAR_MAP = { _VAR_MAP = {
"HERMES_SESSION_PLATFORM": _SESSION_PLATFORM, "HERMES_SESSION_PLATFORM": _SESSION_PLATFORM,
"HERMES_SESSION_CHAT_ID": _SESSION_CHAT_ID, "HERMES_SESSION_CHAT_ID": _SESSION_CHAT_ID,
@ -64,6 +70,9 @@ _VAR_MAP = {
"HERMES_SESSION_USER_ID": _SESSION_USER_ID, "HERMES_SESSION_USER_ID": _SESSION_USER_ID,
"HERMES_SESSION_USER_NAME": _SESSION_USER_NAME, "HERMES_SESSION_USER_NAME": _SESSION_USER_NAME,
"HERMES_SESSION_KEY": _SESSION_KEY, "HERMES_SESSION_KEY": _SESSION_KEY,
"HERMES_CRON_AUTO_DELIVER_PLATFORM": _CRON_AUTO_DELIVER_PLATFORM,
"HERMES_CRON_AUTO_DELIVER_CHAT_ID": _CRON_AUTO_DELIVER_CHAT_ID,
"HERMES_CRON_AUTO_DELIVER_THREAD_ID": _CRON_AUTO_DELIVER_THREAD_ID,
} }

View file

@ -796,6 +796,11 @@ DEFAULT_CONFIG = {
# Wrap delivered cron responses with a header (task name) and footer # Wrap delivered cron responses with a header (task name) and footer
# ("The agent cannot see this message"). Set to false for clean output. # ("The agent cannot see this message"). Set to false for clean output.
"wrap_response": True, "wrap_response": True,
# Maximum number of due jobs to run in parallel per tick.
# null/0 = unbounded (limited only by thread count).
# 1 = serial (pre-v0.9 behaviour).
# Also overridable via HERMES_CRON_MAX_PARALLEL env var.
"max_parallel_jobs": None,
}, },
# execute_code settings — controls the tool used for programmatic tool calls. # execute_code settings — controls the tool used for programmatic tool calls.

View file

@ -772,9 +772,10 @@ class TestRunJobSessionPersistence:
pass pass
def run_conversation(self, *args, **kwargs): def run_conversation(self, *args, **kwargs):
seen["platform"] = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM") from gateway.session_context import get_session_env
seen["chat_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID") seen["platform"] = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM") or None
seen["thread_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID") seen["chat_id"] = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID") or None
seen["thread_id"] = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID") or None
return {"final_response": "ok"} return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \ with patch("cron.scheduler._hermes_home", tmp_path), \
@ -1457,3 +1458,125 @@ class TestSendMediaViaAdapter:
self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"}) self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"})
adapter.send_voice.assert_called_once() adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once() adapter.send_image_file.assert_called_once()
class TestParallelTick:
"""Verify that tick() runs due jobs concurrently and isolates ContextVars."""
@pytest.fixture(autouse=True)
def _isolate_tick_lock(self, tmp_path):
"""Point the tick file lock at a per-test temp dir to avoid xdist contention."""
lock_dir = tmp_path / "cron"
lock_dir.mkdir()
with patch("cron.scheduler._LOCK_DIR", lock_dir), \
patch("cron.scheduler._LOCK_FILE", lock_dir / ".tick.lock"):
yield
def test_parallel_jobs_run_concurrently(self):
"""Two jobs launched in the same tick should overlap in time."""
import threading
import time
barrier = threading.Barrier(2, timeout=5)
call_order = []
def mock_run_job(job):
"""Each job hits a barrier — both must be active simultaneously."""
call_order.append(("start", job["id"]))
barrier.wait() # blocks until both threads reach here
call_order.append(("end", job["id"]))
return (True, "output", "response", None)
jobs = [
{"id": "job-a", "name": "a", "deliver": "local"},
{"id": "job-b", "name": "b", "deliver": "local"},
]
with patch("cron.scheduler.get_due_jobs", return_value=jobs), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.run_job", side_effect=mock_run_job), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
result = tick(verbose=False)
assert result == 2
# Both starts happened before both ends — proof of concurrency
starts = [i for i, (action, _) in enumerate(call_order) if action == "start"]
ends = [i for i, (action, _) in enumerate(call_order) if action == "end"]
assert len(starts) == 2
assert len(ends) == 2
assert max(starts) < min(ends), f"Jobs not concurrent: {call_order}"
def test_parallel_jobs_isolated_contextvars(self):
"""Each job's ContextVars must be isolated — no cross-contamination."""
from gateway.session_context import get_session_env
seen = {}
def mock_run_job(job):
origin = job.get("origin", {})
# run_job sets ContextVars — verify each job sees its own
from gateway.session_context import set_session_vars, clear_session_vars
tokens = set_session_vars(
platform=origin.get("platform", ""),
chat_id=str(origin.get("chat_id", "")),
)
import time
time.sleep(0.05) # give other thread time to set its vars
platform = get_session_env("HERMES_SESSION_PLATFORM")
chat_id = get_session_env("HERMES_SESSION_CHAT_ID")
seen[job["id"]] = {"platform": platform, "chat_id": chat_id}
clear_session_vars(tokens)
return (True, "output", "response", None)
jobs = [
{"id": "tg-job", "name": "tg", "deliver": "local",
"origin": {"platform": "telegram", "chat_id": "111"}},
{"id": "dc-job", "name": "dc", "deliver": "local",
"origin": {"platform": "discord", "chat_id": "222"}},
]
with patch("cron.scheduler.get_due_jobs", return_value=jobs), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.run_job", side_effect=mock_run_job), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
assert seen["tg-job"] == {"platform": "telegram", "chat_id": "111"}
assert seen["dc-job"] == {"platform": "discord", "chat_id": "222"}
def test_max_parallel_env_var(self, monkeypatch):
"""HERMES_CRON_MAX_PARALLEL=1 should restore serial behaviour."""
monkeypatch.setenv("HERMES_CRON_MAX_PARALLEL", "1")
call_times = []
def mock_run_job(job):
import time
call_times.append(("start", job["id"], time.monotonic()))
time.sleep(0.05)
call_times.append(("end", job["id"], time.monotonic()))
return (True, "output", "response", None)
jobs = [
{"id": "s1", "name": "s1", "deliver": "local"},
{"id": "s2", "name": "s2", "deliver": "local"},
]
with patch("cron.scheduler.get_due_jobs", return_value=jobs), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.run_job", side_effect=mock_run_job), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
result = tick(verbose=False)
assert result == 2
# With max_workers=1, second job starts after first ends
end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0]
start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0]
assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1"

View file

@ -359,11 +359,12 @@ def _describe_media_for_mirror(media_files):
def _get_cron_auto_delivery_target(): def _get_cron_auto_delivery_target():
"""Return the cron scheduler's auto-delivery target for the current run, if any.""" """Return the cron scheduler's auto-delivery target for the current run, if any."""
platform = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower() from gateway.session_context import get_session_env
chat_id = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip() platform = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM", "").strip().lower()
chat_id = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID", "").strip()
if not platform or not chat_id: if not platform or not chat_id:
return None return None
thread_id = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None thread_id = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID", "").strip() or None
return { return {
"platform": platform, "platform": platform,
"chat_id": chat_id, "chat_id": chat_id,