fix(cron): preserve skill env passthrough in worker thread

This commit is contained in:
helix4u 2026-04-10 21:32:24 -06:00 committed by kshitij
parent 422f2866e6
commit aa398ad655
2 changed files with 58 additions and 1 deletions

View file

@ -10,6 +10,7 @@ runs at a time if multiple processes overlap.
import asyncio import asyncio
import concurrent.futures import concurrent.futures
import contextvars
import json import json
import logging import logging
import os import os
@ -770,7 +771,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
_POLL_INTERVAL = 5.0 _POLL_INTERVAL = 5.0
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) _cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt) # Preserve scheduler-scoped ContextVar state (for example skill-declared
# env passthrough registrations) when the cron run hops into the worker
# thread used for inactivity timeout monitoring.
_cron_context = contextvars.copy_context()
_cron_future = _cron_pool.submit(_cron_context.run, agent.run_conversation, prompt)
_inactivity_timeout = False _inactivity_timeout = False
try: try:
if _cron_inactivity_limit is None: if _cron_inactivity_limit is None:

View file

@ -8,6 +8,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
import pytest import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt
from tools.env_passthrough import clear_env_passthrough
class TestResolveOrigin: class TestResolveOrigin:
@ -877,6 +878,57 @@ class TestRunJobPerJobOverrides:
class TestRunJobSkillBacked: class TestRunJobSkillBacked:
def test_run_job_preserves_skill_env_passthrough_into_worker_thread(self, tmp_path):
job = {
"id": "skill-env-job",
"name": "skill env test",
"prompt": "Use the skill.",
"skill": "notion",
}
fake_db = MagicMock()
def _skill_view(name):
assert name == "notion"
from tools.env_passthrough import register_env_passthrough
register_env_passthrough(["NOTION_API_KEY"])
return json.dumps({"success": True, "content": "# notion\nUse Notion."})
def _run_conversation(prompt):
from tools.env_passthrough import get_all_passthrough
assert "NOTION_API_KEY" in get_all_passthrough()
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.side_effect = _run_conversation
mock_agent_cls.return_value = mock_agent
try:
success, output, final_response, error = run_job(job)
finally:
clear_env_passthrough()
assert success is True
assert error is None
assert final_response == "ok"
def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path): def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path):
job = { job = {
"id": "skill-job", "id": "skill-job",