hermes-agent/tests/cron/test_scheduler.py
2026-04-24 19:26:13 -05:00

1821 lines
74 KiB
Python

"""Tests for cron/scheduler.py — origin resolution, delivery routing, and error logging."""
import json
import logging
import os
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt
from tools.env_passthrough import clear_env_passthrough
from tools.credential_files import clear_credential_files
class TestResolveOrigin:
def test_full_origin(self):
job = {
"origin": {
"platform": "telegram",
"chat_id": "123456",
"chat_name": "Test Chat",
"thread_id": "42",
}
}
result = _resolve_origin(job)
assert isinstance(result, dict)
assert result == job["origin"]
assert result["platform"] == "telegram"
assert result["chat_id"] == "123456"
assert result["chat_name"] == "Test Chat"
assert result["thread_id"] == "42"
def test_no_origin(self):
assert _resolve_origin({}) is None
assert _resolve_origin({"origin": None}) is None
def test_missing_platform(self):
job = {"origin": {"chat_id": "123"}}
assert _resolve_origin(job) is None
def test_missing_chat_id(self):
job = {"origin": {"platform": "telegram"}}
assert _resolve_origin(job) is None
def test_empty_origin(self):
job = {"origin": {}}
assert _resolve_origin(job) is None
class TestResolveDeliveryTarget:
def test_origin_delivery_preserves_thread_id(self):
job = {
"deliver": "origin",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
@pytest.mark.parametrize(
("platform", "env_var", "chat_id"),
[
("matrix", "MATRIX_HOME_ROOM", "!bot-room:example.org"),
("signal", "SIGNAL_HOME_CHANNEL", "+15551234567"),
("mattermost", "MATTERMOST_HOME_CHANNEL", "team-town-square"),
("sms", "SMS_HOME_CHANNEL", "+15557654321"),
("email", "EMAIL_HOME_ADDRESS", "home@example.com"),
("dingtalk", "DINGTALK_HOME_CHANNEL", "cidNNN"),
("feishu", "FEISHU_HOME_CHANNEL", "oc_home"),
("wecom", "WECOM_HOME_CHANNEL", "wecom-home"),
("weixin", "WEIXIN_HOME_CHANNEL", "wxid_home"),
("qqbot", "QQ_HOME_CHANNEL", "group-openid-home"),
],
)
def test_origin_delivery_without_origin_falls_back_to_supported_home_channels(
self, monkeypatch, platform, env_var, chat_id
):
for fallback_env in (
"MATRIX_HOME_ROOM",
"MATRIX_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL",
"DISCORD_HOME_CHANNEL",
"SLACK_HOME_CHANNEL",
"SIGNAL_HOME_CHANNEL",
"MATTERMOST_HOME_CHANNEL",
"SMS_HOME_CHANNEL",
"EMAIL_HOME_ADDRESS",
"DINGTALK_HOME_CHANNEL",
"BLUEBUBBLES_HOME_CHANNEL",
"FEISHU_HOME_CHANNEL",
"WECOM_HOME_CHANNEL",
"WEIXIN_HOME_CHANNEL",
"QQ_HOME_CHANNEL",
):
monkeypatch.delenv(fallback_env, raising=False)
monkeypatch.setenv(env_var, chat_id)
assert _resolve_delivery_target({"deliver": "origin"}) == {
"platform": platform,
"chat_id": chat_id,
"thread_id": None,
}
def test_bare_matrix_delivery_uses_matrix_home_room(self, monkeypatch):
monkeypatch.delenv("MATRIX_HOME_CHANNEL", raising=False)
monkeypatch.setenv("MATRIX_HOME_ROOM", "!room123:example.org")
assert _resolve_delivery_target({"deliver": "matrix"}) == {
"platform": "matrix",
"chat_id": "!room123:example.org",
"thread_id": None,
}
def test_explicit_telegram_topic_target_with_thread_id(self):
"""deliver: 'telegram:chat_id:thread_id' parses correctly."""
job = {
"deliver": "telegram:-1003724596514:17",
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1003724596514",
"thread_id": "17",
}
def test_explicit_telegram_chat_id_without_thread_id(self):
"""deliver: 'telegram:chat_id' sets thread_id to None."""
job = {
"deliver": "telegram:-1003724596514",
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1003724596514",
"thread_id": None,
}
def test_human_friendly_label_resolved_via_channel_directory(self):
"""deliver: 'whatsapp:Alice (dm)' resolves to the real JID."""
job = {"deliver": "whatsapp:Alice (dm)"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="12345678901234@lid",
) as resolve_mock:
result = _resolve_delivery_target(job)
resolve_mock.assert_called_once_with("whatsapp", "Alice (dm)")
assert result == {
"platform": "whatsapp",
"chat_id": "12345678901234@lid",
"thread_id": None,
}
def test_human_friendly_label_without_suffix_resolved(self):
"""deliver: 'telegram:My Group' resolves without display suffix."""
job = {"deliver": "telegram:My Group"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="-1009999",
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "telegram",
"chat_id": "-1009999",
"thread_id": None,
}
def test_human_friendly_topic_label_preserves_thread_id(self):
"""Resolved Telegram topic labels should split chat_id and thread_id."""
job = {"deliver": "telegram:Coaching Chat / topic 17585 (group)"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="-1009999:17585",
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "telegram",
"chat_id": "-1009999",
"thread_id": "17585",
}
def test_raw_id_not_mangled_when_directory_returns_none(self):
"""deliver: 'whatsapp:12345@lid' passes through when directory has no match."""
job = {"deliver": "whatsapp:12345@lid"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value=None,
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "whatsapp",
"chat_id": "12345@lid",
"thread_id": None,
}
def test_bare_platform_uses_matching_origin_chat(self):
job = {
"deliver": "telegram",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
def test_bare_platform_falls_back_to_home_channel(self, monkeypatch):
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "-2002")
job = {
"deliver": "telegram",
"origin": {
"platform": "discord",
"chat_id": "abc",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-2002",
"thread_id": None,
}
def test_explicit_discord_topic_target_with_thread_id(self):
"""deliver: 'discord:chat_id:thread_id' parses correctly."""
job = {
"deliver": "discord:-1001234567890:17585",
}
assert _resolve_delivery_target(job) == {
"platform": "discord",
"chat_id": "-1001234567890",
"thread_id": "17585",
}
def test_explicit_discord_chat_id_without_thread_id(self):
"""deliver: 'discord:chat_id' sets thread_id to None."""
job = {
"deliver": "discord:9876543210",
}
assert _resolve_delivery_target(job) == {
"platform": "discord",
"chat_id": "9876543210",
"thread_id": None,
}
def test_explicit_discord_channel_without_thread(self):
"""deliver: 'discord:1001234567890' resolves via explicit platform:chat_id path."""
job = {
"deliver": "discord:1001234567890",
}
result = _resolve_delivery_target(job)
assert result == {
"platform": "discord",
"chat_id": "1001234567890",
"thread_id": None,
}
class TestDeliverResultWrapping:
"""Verify that cron deliveries are wrapped with header/footer and no longer mirrored."""
def test_delivery_wraps_content_with_header_and_footer(self):
"""Delivered content should include task name header and agent-invisible note."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Here is today's summary.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: daily-report" in sent_content
assert "(job_id: test-job)" in sent_content
assert "-------------" in sent_content
assert "Here is today's summary." in sent_content
assert "To stop or manage this job" in sent_content
def test_delivery_uses_job_id_when_no_name(self):
"""When a job has no name, the wrapper should fall back to job id."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "abc-123",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Output.")
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: abc-123" in sent_content
def test_delivery_skips_wrapping_when_config_disabled(self):
"""When cron.wrap_response is false, deliver raw content without header/footer."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}):
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Clean output only.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert sent_content == "Clean output only."
assert "Cronjob Response" not in sent_content
assert "The agent cannot see" not in sent_content
def test_delivery_extracts_media_tags_before_send(self):
"""Cron delivery should pass MEDIA attachments separately to the send helper."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}):
job = {
"id": "voice-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Title\nMEDIA:/tmp/test-voice.ogg")
send_mock.assert_called_once()
args, kwargs = send_mock.call_args
# Text content should have MEDIA: tag stripped
assert "MEDIA:" not in args[3]
assert "Title" in args[3]
# Media files should be forwarded separately
assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)]
def test_live_adapter_sends_media_as_attachments(self):
"""When a live adapter is available, MEDIA files should be sent as native
platform attachments (e.g., Discord voice, Telegram audio) rather than
as literal 'MEDIA:/path' text."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
adapter.send_voice.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.DISCORD: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
# run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg)
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "tts-job",
"deliver": "origin",
"origin": {"platform": "discord", "chat_id": "9876"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Here is TTS\nMEDIA:/tmp/cron-voice.mp3",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
# Text should be sent without the MEDIA tag
adapter.send.assert_called_once()
text_sent = adapter.send.call_args[0][1]
assert "MEDIA:" not in text_sent
assert "Here is TTS" in text_sent
# Audio file should be sent as a voice attachment
adapter.send_voice.assert_called_once()
voice_call = adapter.send_voice.call_args
assert voice_call[1]["audio_path"] == "/tmp/cron-voice.mp3"
def test_live_adapter_routes_image_to_send_image_file(self):
"""Image MEDIA files should be routed to send_image_file, not send_voice."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
adapter.send_image_file.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.DISCORD: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "img-job",
"deliver": "origin",
"origin": {"platform": "discord", "chat_id": "1234"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Chart attached\nMEDIA:/tmp/chart.png",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
adapter.send_image_file.assert_called_once()
assert adapter.send_image_file.call_args[1]["image_path"] == "/tmp/chart.png"
adapter.send_voice.assert_not_called()
def test_live_adapter_media_only_no_text(self):
"""When content is ONLY a MEDIA tag with no text, media should still be sent."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send_voice.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "voice-only",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "999"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"MEDIA:/tmp/voice.ogg",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
# Text send should NOT be called (no text after stripping MEDIA tag)
adapter.send.assert_not_called()
# Audio should still be delivered
adapter.send_voice.assert_called_once()
def test_live_adapter_sends_cleaned_text_not_raw(self):
"""The live adapter path must send cleaned text (MEDIA tags stripped),
not the raw delivery_content with embedded MEDIA: tags."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "img-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "555"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Report\nMEDIA:/tmp/chart.png",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
text_sent = adapter.send.call_args[0][1]
assert "MEDIA:" not in text_sent
assert "Report" in text_sent
def test_no_mirror_to_session_call(self):
"""Cron deliveries should NOT mirror into the gateway session."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session") as mirror_mock:
job = {
"id": "test-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Hello!")
mirror_mock.assert_not_called()
def test_origin_delivery_preserves_thread_id(self):
"""Origin delivery should forward thread_id to the send helper."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
job = {
"id": "test-job",
"name": "topic-job",
"deliver": "origin",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
_deliver_result(job, "hello")
send_mock.assert_called_once()
assert send_mock.call_args.kwargs["thread_id"] == "17585"
class TestDeliverResultErrorReturns:
"""Verify _deliver_result returns error strings on failure, None on success."""
def test_returns_error_when_platform_disabled(self):
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = False
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg):
job = {
"id": "disabled",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
result = _deliver_result(job, "Output.")
assert result is not None
assert "not configured" in result
def test_returns_error_for_unresolved_target(self, monkeypatch):
"""Non-local delivery with no resolvable target should return an error."""
monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
job = {"id": "no-target", "deliver": "telegram"}
result = _deliver_result(job, "Output.")
assert result is not None
assert "no delivery target" in result
class TestRunJobSessionPersistence:
def test_run_job_passes_session_db_and_cron_platform(self, tmp_path):
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "test-key",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert "ok" in output
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["session_db"] is fake_db
assert kwargs["platform"] == "cron"
assert kwargs["session_id"].startswith("cron_test-job_")
fake_db.end_session.assert_called_once()
call_args = fake_db.end_session.call_args
assert call_args[0][0].startswith("cron_test-job_")
assert call_args[0][1] == "cron_complete"
fake_db.close.assert_called_once()
def _make_run_job_patches(self, tmp_path):
"""Common patches for run_job tests."""
fake_db = MagicMock()
return fake_db, [
patch("cron.scheduler._hermes_home", tmp_path),
patch("cron.scheduler._resolve_origin", return_value=None),
patch("dotenv.load_dotenv"),
patch("hermes_state.SessionDB", return_value=fake_db),
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "test-key",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
),
]
def test_run_job_passes_enabled_toolsets_to_agent(self, tmp_path):
job = {
"id": "toolset-job",
"name": "test",
"prompt": "hello",
"enabled_toolsets": ["web", "terminal", "file"],
}
fake_db, patches = self._make_run_job_patches(tmp_path)
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
run_job(job)
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["enabled_toolsets"] == ["web", "terminal", "file"]
def test_run_job_enabled_toolsets_resolves_from_platform_config_when_not_set(self, tmp_path):
"""When a job has no explicit enabled_toolsets, the scheduler now
resolves them from ``hermes tools`` platform config for ``cron``
(PR #14xxx — blanket fix for Norbert's surprise ``moa`` run).
The legacy "pass None → AIAgent loads full default" path is still
reachable, but only when ``_get_platform_tools`` raises (safety net
for any unexpected config shape).
"""
job = {
"id": "no-toolset-job",
"name": "test",
"prompt": "hello",
}
fake_db, patches = self._make_run_job_patches(tmp_path)
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
run_job(job)
kwargs = mock_agent_cls.call_args.kwargs
# Resolution happened — not None, is a list.
assert isinstance(kwargs["enabled_toolsets"], list)
# The cron default is _HERMES_CORE_TOOLS with _DEFAULT_OFF_TOOLSETS
# (``moa``, ``homeassistant``, ``rl``) removed. The most important
# invariant: ``moa`` is NOT in the default cron toolset, so a cron
# run cannot accidentally spin up frontier models.
assert "moa" not in kwargs["enabled_toolsets"]
def test_run_job_per_job_toolsets_win_over_platform_config(self, tmp_path):
"""Per-job enabled_toolsets (via cronjob tool) always take precedence
over the platform-level ``hermes tools`` config."""
job = {
"id": "override-job",
"name": "test",
"prompt": "hello",
"enabled_toolsets": ["terminal"],
}
fake_db, patches = self._make_run_job_patches(tmp_path)
# Even if the user has ``hermes tools`` configured to enable web+file
# for cron, the per-job override wins.
with patches[0], patches[1], patches[2], patches[3], patches[4], \
patch("run_agent.AIAgent") as mock_agent_cls, \
patch(
"hermes_cli.tools_config._get_platform_tools",
return_value={"web", "file"},
):
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
run_job(job)
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["enabled_toolsets"] == ["terminal"]
def test_run_job_empty_response_returns_empty_not_placeholder(self, tmp_path):
"""Empty final_response should stay empty for delivery logic (issue #2234).
The placeholder '(No response generated)' should only appear in the
output log, not in the returned final_response that's used for delivery.
"""
job = {
"id": "silent-job",
"name": "silent test",
"prompt": "do work via tools only",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
# Agent did work via tools but returned no text
mock_agent.run_conversation.return_value = {"final_response": ""}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
# final_response should be empty for delivery logic to skip
assert final_response == ""
# But the output log should show the placeholder
assert "(No response generated)" in output
def test_tick_marks_empty_response_as_error(self, tmp_path):
"""When run_job returns success=True but final_response is empty,
tick() should mark the job as error so last_status != 'ok'.
(issue #8585)
"""
from cron.scheduler import tick
from cron.jobs import load_jobs, save_jobs
job = {
"id": "empty-job",
"name": "empty-test",
"prompt": "do something",
"schedule": "every 1h",
"enabled": True,
"next_run_at": "2020-01-01T00:00:00",
"deliver": "local",
"last_status": None,
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler.get_due_jobs", return_value=[job]), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.mark_job_run") as mock_mark, \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("cron.scheduler.run_job", return_value=(True, "output", "", None)):
tick(verbose=False)
# Should be called with success=False because final_response is empty
mock_mark.assert_called_once()
call_args = mock_mark.call_args
assert call_args[0][0] == "empty-job"
assert call_args[0][1] is False # success should be False
assert "empty" in call_args[0][2].lower() # error should mention empty
def test_run_job_sets_auto_delivery_env_from_dotenv_home_channel(self, tmp_path, monkeypatch):
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
"deliver": "telegram",
}
fake_db = MagicMock()
seen = {}
(tmp_path / ".env").write_text("TELEGRAM_HOME_CHANNEL=-2002\n")
monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", raising=False)
class FakeAgent:
def __init__(self, *args, **kwargs):
pass
def run_conversation(self, *args, **kwargs):
from gateway.session_context import get_session_env
seen["platform"] = get_session_env("HERMES_CRON_AUTO_DELIVER_PLATFORM") or None
seen["chat_id"] = get_session_env("HERMES_CRON_AUTO_DELIVER_CHAT_ID") or None
seen["thread_id"] = get_session_env("HERMES_CRON_AUTO_DELIVER_THREAD_ID") or None
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent", FakeAgent):
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert "ok" in output
assert seen == {
"platform": "telegram",
"chat_id": "-2002",
"thread_id": None,
}
assert os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM") is None
assert os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID") is None
assert os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID") is None
fake_db.close.assert_called_once()
class TestRunJobConfigLogging:
"""Verify that config.yaml parse failures are logged, not silently swallowed."""
def test_bad_config_yaml_is_logged(self, caplog, tmp_path):
"""When config.yaml is malformed, a warning should be logged."""
bad_yaml = tmp_path / "config.yaml"
bad_yaml.write_text("invalid: yaml: [[[bad")
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
run_job(job)
assert any("failed to load config.yaml" in r.message for r in caplog.records), \
f"Expected 'failed to load config.yaml' warning in logs, got: {[r.message for r in caplog.records]}"
def test_bad_prefill_messages_is_logged(self, caplog, tmp_path):
"""When the prefill messages file contains invalid JSON, a warning should be logged."""
# Valid config.yaml that points to a bad prefill file
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text("prefill_messages_file: prefill.json\n")
bad_prefill = tmp_path / "prefill.json"
bad_prefill.write_text("{not valid json!!!")
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
run_job(job)
assert any("failed to parse prefill messages" in r.message for r in caplog.records), \
f"Expected 'failed to parse prefill messages' warning in logs, got: {[r.message for r in caplog.records]}"
class TestRunJobSkillBacked:
def test_run_job_preserves_skill_env_passthrough_into_worker_thread(self, tmp_path):
job = {
"id": "skill-env-job",
"name": "skill env test",
"prompt": "Use the skill.",
"skill": "notion",
}
fake_db = MagicMock()
def _skill_view(name):
assert name == "notion"
from tools.env_passthrough import register_env_passthrough
register_env_passthrough(["NOTION_API_KEY"])
return json.dumps({"success": True, "content": "# notion\nUse Notion."})
def _run_conversation(prompt):
from tools.env_passthrough import get_all_passthrough
assert "NOTION_API_KEY" in get_all_passthrough()
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.side_effect = _run_conversation
mock_agent_cls.return_value = mock_agent
try:
success, output, final_response, error = run_job(job)
finally:
clear_env_passthrough()
assert success is True
assert error is None
assert final_response == "ok"
def test_run_job_preserves_credential_file_passthrough_into_worker_thread(self, tmp_path):
"""copy_context() also propagates credential_files ContextVar."""
job = {
"id": "cred-env-job",
"name": "cred file test",
"prompt": "Use the skill.",
"skill": "google-workspace",
}
fake_db = MagicMock()
# Create a credential file so register_credential_file succeeds
cred_dir = tmp_path / "credentials"
cred_dir.mkdir()
(cred_dir / "google_token.json").write_text('{"token": "t"}')
def _skill_view(name):
assert name == "google-workspace"
from tools.credential_files import register_credential_file
register_credential_file("credentials/google_token.json")
return json.dumps({"success": True, "content": "# google-workspace\nUse Google."})
def _run_conversation(prompt):
from tools.credential_files import _get_registered
registered = _get_registered()
assert registered, "credential files must be visible in worker thread"
assert any("google_token.json" in v for v in registered.values())
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("tools.credential_files._resolve_hermes_home", return_value=tmp_path), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.side_effect = _run_conversation
mock_agent_cls.return_value = mock_agent
try:
success, output, final_response, error = run_job(job)
finally:
clear_credential_files()
assert success is True
assert error is None
assert final_response == "ok"
def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path):
job = {
"id": "skill-job",
"name": "skill test",
"prompt": "Check the feeds and summarize anything new.",
"skill": "blogwatcher",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", return_value=json.dumps({"success": True, "content": "# Blogwatcher\nFollow this skill."})), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
kwargs = mock_agent_cls.call_args.kwargs
assert "cronjob" in (kwargs["disabled_toolsets"] or [])
prompt_arg = mock_agent.run_conversation.call_args.args[0]
assert "blogwatcher" in prompt_arg
assert "Follow this skill" in prompt_arg
assert "Check the feeds and summarize anything new." in prompt_arg
def test_run_job_loads_multiple_skills_in_order(self, tmp_path):
job = {
"id": "multi-skill-job",
"name": "multi skill test",
"prompt": "Combine the results.",
"skills": ["blogwatcher", "maps"],
}
fake_db = MagicMock()
def _skill_view(name):
return json.dumps({"success": True, "content": f"# {name}\nInstructions for {name}."})
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view) as skill_view_mock, \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert skill_view_mock.call_count == 2
assert [call.args[0] for call in skill_view_mock.call_args_list] == ["blogwatcher", "maps"]
prompt_arg = mock_agent.run_conversation.call_args.args[0]
assert prompt_arg.index("blogwatcher") < prompt_arg.index("maps")
assert "Instructions for blogwatcher." in prompt_arg
assert "Instructions for maps." in prompt_arg
assert "Combine the results." in prompt_arg
class TestSilentDelivery:
"""Verify that [SILENT] responses suppress delivery while still saving output."""
def _make_job(self):
return {
"id": "monitor-job",
"name": "monitor",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
def test_silent_response_suppresses_delivery(self, caplog):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT]", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
with caplog.at_level(logging.INFO, logger="cron.scheduler"):
tick(verbose=False)
deliver_mock.assert_not_called()
assert any(SILENT_MARKER in r.message for r in caplog.records)
def test_silent_with_note_suppresses_delivery(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT] No changes detected", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_silent_trailing_suppresses_delivery(self):
"""Agent appended [SILENT] after explanation text — must still suppress."""
response = "2 deals filtered out (like<10, reply<15).\n\n[SILENT]"
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", response, None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_silent_is_case_insensitive(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[silent] nothing new", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_failed_job_always_delivers(self):
"""Failed jobs deliver regardless of [SILENT] in output."""
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(False, "# output", "", "some error")), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_called_once()
def test_output_saved_even_when_delivery_suppressed(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# full output", "[SILENT]", None)), \
patch("cron.scheduler.save_job_output") as save_mock, \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
save_mock.return_value = "/tmp/out.md"
from cron.scheduler import tick
tick(verbose=False)
save_mock.assert_called_once_with("monitor-job", "# full output")
deliver_mock.assert_not_called()
class TestBuildJobPromptSilentHint:
"""Verify _build_job_prompt always injects [SILENT] guidance."""
def test_hint_always_present(self):
job = {"prompt": "Check for updates"}
result = _build_job_prompt(job)
assert "[SILENT]" in result
assert "Check for updates" in result
def test_hint_present_even_without_prompt(self):
job = {"prompt": ""}
result = _build_job_prompt(job)
assert "[SILENT]" in result
def test_delivery_guidance_present(self):
"""Cron hint tells agents their final response is auto-delivered."""
job = {"prompt": "Generate a report"}
result = _build_job_prompt(job)
assert "do NOT use send_message" in result
assert "automatically delivered" in result
def test_delivery_guidance_precedes_user_prompt(self):
"""System guidance appears before the user's prompt text."""
job = {"prompt": "My custom prompt"}
result = _build_job_prompt(job)
system_pos = result.index("do NOT use send_message")
prompt_pos = result.index("My custom prompt")
assert system_pos < prompt_pos
def test_study_role_injects_execution_guidance(self):
job = {"prompt": "Study Chapter 7", "role": "study"}
result = _build_job_prompt(job)
assert "classified as role=study" in result
assert "execution loop, not a passive summary" in result
assert "self_improvement_pipeline" in result
def test_self_improve_role_injects_journal_followthrough_guidance(self):
job = {"prompt": "Run the loop", "role": "self-improve"}
result = _build_job_prompt(job)
assert "classified as role=self-improve" in result
assert "journal_entries stale or missing" in result
assert "do not mask true stale evidence" in result
def test_non_study_role_does_not_inject_role_guidance(self):
job = {"prompt": "Send report", "role": "report"}
result = _build_job_prompt(job)
assert "classified as role=study" not in result
assert "classified as role=self-improve" not in result
class TestParseWakeGate:
"""Unit tests for _parse_wake_gate — pure function, no side effects."""
def test_empty_output_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate("") is True
assert _parse_wake_gate(None) is True
def test_whitespace_only_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate(" \n\n \t\n") is True
def test_non_json_last_line_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate("hello world") is True
assert _parse_wake_gate("line 1\nline 2\nplain text") is True
def test_json_non_dict_wakes(self):
"""Bare arrays, numbers, strings must not be interpreted as a gate."""
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate("[1, 2, 3]") is True
assert _parse_wake_gate("42") is True
assert _parse_wake_gate('"wakeAgent"') is True
def test_wake_gate_false_skips(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"wakeAgent": false}') is False
def test_wake_gate_true_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"wakeAgent": true}') is True
def test_wake_gate_missing_wakes(self):
"""A JSON dict without a wakeAgent key defaults to waking."""
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"data": {"foo": "bar"}}') is True
def test_non_boolean_false_still_wakes(self):
"""Only strict ``False`` skips — truthy/falsy shortcuts are too risky."""
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"wakeAgent": 0}') is True
assert _parse_wake_gate('{"wakeAgent": null}') is True
assert _parse_wake_gate('{"wakeAgent": ""}') is True
def test_only_last_non_empty_line_parsed(self):
from cron.scheduler import _parse_wake_gate
multi = 'some log output\nmore output\n{"wakeAgent": false}'
assert _parse_wake_gate(multi) is False
def test_trailing_blank_lines_ignored(self):
from cron.scheduler import _parse_wake_gate
multi = '{"wakeAgent": false}\n\n\n'
assert _parse_wake_gate(multi) is False
def test_non_last_json_line_does_not_gate(self):
"""A JSON gate on an earlier line with plain text after it does NOT trigger."""
from cron.scheduler import _parse_wake_gate
multi = '{"wakeAgent": false}\nactually this is the real output'
assert _parse_wake_gate(multi) is True
class TestRunJobWakeGate:
"""Integration tests for run_job wake-gate short-circuit."""
@pytest.fixture(autouse=True)
def _stub_runtime_provider(self):
"""Stub ``resolve_runtime_provider`` for wake-gate tests.
``run_job`` resolves the runtime provider BEFORE constructing
``AIAgent``, so these tests must mock ``resolve_runtime_provider``
in addition to ``AIAgent`` — otherwise in a hermetic CI env (no
API keys), the resolver raises and the test fails before the
patched AIAgent is ever reached.
"""
fake_runtime = {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "test-key",
"source": "stub",
"requested_provider": None,
}
with patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value=fake_runtime,
):
yield
def _make_job(self, name="wake-gate-test", script="check.py"):
"""Minimal valid cron job dict for run_job."""
return {
"id": f"job_{name}",
"name": name,
"prompt": "Do a thing",
"schedule": "*/5 * * * *",
"script": script,
}
def test_wake_false_skips_agent_and_returns_silent(self, caplog):
"""When _run_job_script output ends with {wakeAgent: false}, the agent
is not invoked and run_job returns the SILENT marker so delivery is
suppressed."""
from cron.scheduler import SILENT_MARKER
import cron.scheduler as scheduler
with patch.object(scheduler, "_run_job_script",
return_value=(True, '{"wakeAgent": false}')), \
patch("run_agent.AIAgent") as agent_cls:
success, doc, final, err = scheduler.run_job(self._make_job())
assert success is True
assert err is None
assert final == SILENT_MARKER
assert "Script gate returned `wakeAgent=false`" in doc
agent_cls.assert_not_called()
def test_wake_true_runs_agent_with_injected_output(self):
"""When the script returns {wakeAgent: true, data: ...}, the agent is
invoked and the data line still shows up in the prompt."""
import cron.scheduler as scheduler
script_output = '{"wakeAgent": true, "data": {"new": 3}}'
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
with patch.object(scheduler, "_run_job_script",
return_value=(True, script_output)), \
patch("run_agent.AIAgent", return_value=agent) as agent_cls:
success, doc, final, err = scheduler.run_job(self._make_job())
agent_cls.assert_called_once()
# The script output should be visible in the prompt passed to
# run_conversation.
call_kwargs = agent.run_conversation.call_args
prompt_arg = call_kwargs.args[0] if call_kwargs.args else call_kwargs.kwargs.get("user_message", "")
assert script_output in prompt_arg
assert success is True
assert err is None
def test_script_runs_only_once_on_wake(self):
"""Wake-true path must not re-run the script inside _build_job_prompt
(script would execute twice otherwise, wasting work and risking
double-side-effects)."""
import cron.scheduler as scheduler
call_count = 0
def _script_stub(path):
nonlocal call_count
call_count += 1
return (True, "regular output")
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
with patch.object(scheduler, "_run_job_script", side_effect=_script_stub), \
patch("run_agent.AIAgent", return_value=agent):
scheduler.run_job(self._make_job())
assert call_count == 1, f"script ran {call_count}x, expected exactly 1"
def test_script_failure_does_not_trigger_gate(self):
"""If _run_job_script returns success=False, the gate is NOT evaluated
and the agent still runs (the failure is reported as context)."""
import cron.scheduler as scheduler
# Malicious or broken script whose stderr happens to contain the
# gate JSON — we must NOT honor it because ran_ok is False.
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
with patch.object(scheduler, "_run_job_script",
return_value=(False, '{"wakeAgent": false}')), \
patch("run_agent.AIAgent", return_value=agent) as agent_cls:
success, doc, final, err = scheduler.run_job(self._make_job())
agent_cls.assert_called_once() # Agent DID wake despite the gate-like text
def test_no_script_path_runs_agent_normally(self):
"""Regression: jobs without a script still work."""
import cron.scheduler as scheduler
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
job = self._make_job(script=None)
job.pop("script", None)
with patch.object(scheduler, "_run_job_script") as script_fn, \
patch("run_agent.AIAgent", return_value=agent) as agent_cls:
scheduler.run_job(job)
script_fn.assert_not_called()
agent_cls.assert_called_once()
class TestBuildJobPromptMissingSkill:
"""Verify that a missing skill logs a warning and does not crash the job."""
def _missing_skill_view(self, name: str) -> str:
return json.dumps({"success": False, "error": f"Skill '{name}' not found."})
def test_missing_skill_does_not_raise(self):
"""Job should run even when a referenced skill is not installed."""
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill"], "prompt": "do something"})
# prompt is preserved even though skill was skipped
assert "do something" in result
def test_missing_skill_injects_user_notice_into_prompt(self):
"""A system notice about the missing skill is injected into the prompt."""
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill"], "prompt": "do something"})
assert "ghost-skill" in result
assert "not found" in result.lower() or "skipped" in result.lower()
def test_missing_skill_logs_warning(self, caplog):
"""A warning is logged when a skill cannot be found."""
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
_build_job_prompt({"name": "My Job", "skills": ["ghost-skill"], "prompt": "do something"})
assert any("ghost-skill" in record.message for record in caplog.records)
def test_valid_skill_loaded_alongside_missing(self):
"""A valid skill is still loaded when another skill in the list is missing."""
def _mixed_skill_view(name: str) -> str:
if name == "real-skill":
return json.dumps({"success": True, "content": "Real skill content."})
return json.dumps({"success": False, "error": f"Skill '{name}' not found."})
with patch("tools.skills_tool.skill_view", side_effect=_mixed_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill", "real-skill"], "prompt": "go"})
assert "Real skill content." in result
assert "go" in result
class TestSendMediaViaAdapter:
"""Unit tests for _send_media_via_adapter — routes files to typed adapter methods."""
@staticmethod
def _run_with_loop(adapter, chat_id, media_files, metadata, job):
"""Helper: run _send_media_via_adapter with a real running event loop."""
import asyncio
import threading
loop = asyncio.new_event_loop()
t = threading.Thread(target=loop.run_forever, daemon=True)
t.start()
try:
_send_media_via_adapter(adapter, chat_id, media_files, metadata, loop, job)
finally:
loop.call_soon_threadsafe(loop.stop)
t.join(timeout=5)
loop.close()
def test_video_dispatched_to_send_video(self):
adapter = MagicMock()
adapter.send_video = AsyncMock()
media_files = [("/tmp/clip.mp4", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j1"})
adapter.send_video.assert_called_once()
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/clip.mp4"
def test_unknown_ext_dispatched_to_send_document(self):
adapter = MagicMock()
adapter.send_document = AsyncMock()
media_files = [("/tmp/report.pdf", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j2"})
adapter.send_document.assert_called_once()
assert adapter.send_document.call_args[1]["file_path"] == "/tmp/report.pdf"
def test_multiple_media_files_all_delivered(self):
adapter = MagicMock()
adapter.send_voice = AsyncMock()
adapter.send_image_file = AsyncMock()
media_files = [("/tmp/voice.mp3", False), ("/tmp/photo.jpg", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"})
adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once()
class TestParallelTick:
"""Verify that tick() runs due jobs concurrently and isolates ContextVars."""
@pytest.fixture(autouse=True)
def _isolate_tick_lock(self, tmp_path):
"""Point the tick file lock at a per-test temp dir to avoid xdist contention."""
lock_dir = tmp_path / "cron"
lock_dir.mkdir()
with patch("cron.scheduler._LOCK_DIR", lock_dir), \
patch("cron.scheduler._LOCK_FILE", lock_dir / ".tick.lock"):
yield
def test_parallel_jobs_run_concurrently(self):
"""Two jobs launched in the same tick should overlap in time."""
import threading
import time
barrier = threading.Barrier(2, timeout=5)
call_order = []
def mock_run_job(job):
"""Each job hits a barrier — both must be active simultaneously."""
call_order.append(("start", job["id"]))
barrier.wait() # blocks until both threads reach here
call_order.append(("end", job["id"]))
return (True, "output", "response", None)
jobs = [
{"id": "job-a", "name": "a", "deliver": "local"},
{"id": "job-b", "name": "b", "deliver": "local"},
]
with patch("cron.scheduler.get_due_jobs", return_value=jobs), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.run_job", side_effect=mock_run_job), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
result = tick(verbose=False)
assert result == 2
# Both starts happened before both ends — proof of concurrency
starts = [i for i, (action, _) in enumerate(call_order) if action == "start"]
ends = [i for i, (action, _) in enumerate(call_order) if action == "end"]
assert len(starts) == 2
assert len(ends) == 2
assert max(starts) < min(ends), f"Jobs not concurrent: {call_order}"
def test_parallel_jobs_isolated_contextvars(self):
"""Each job's ContextVars must be isolated — no cross-contamination."""
from gateway.session_context import get_session_env
seen = {}
def mock_run_job(job):
origin = job.get("origin", {})
# run_job sets ContextVars — verify each job sees its own
from gateway.session_context import set_session_vars, clear_session_vars
tokens = set_session_vars(
platform=origin.get("platform", ""),
chat_id=str(origin.get("chat_id", "")),
)
import time
time.sleep(0.05) # give other thread time to set its vars
platform = get_session_env("HERMES_SESSION_PLATFORM")
chat_id = get_session_env("HERMES_SESSION_CHAT_ID")
seen[job["id"]] = {"platform": platform, "chat_id": chat_id}
clear_session_vars(tokens)
return (True, "output", "response", None)
jobs = [
{"id": "tg-job", "name": "tg", "deliver": "local",
"origin": {"platform": "telegram", "chat_id": "111"}},
{"id": "dc-job", "name": "dc", "deliver": "local",
"origin": {"platform": "discord", "chat_id": "222"}},
]
with patch("cron.scheduler.get_due_jobs", return_value=jobs), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.run_job", side_effect=mock_run_job), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
assert seen["tg-job"] == {"platform": "telegram", "chat_id": "111"}
assert seen["dc-job"] == {"platform": "discord", "chat_id": "222"}
def test_max_parallel_env_var(self, monkeypatch):
"""HERMES_CRON_MAX_PARALLEL=1 should restore serial behaviour."""
monkeypatch.setenv("HERMES_CRON_MAX_PARALLEL", "1")
call_times = []
def mock_run_job(job):
import time
call_times.append(("start", job["id"], time.monotonic()))
time.sleep(0.05)
call_times.append(("end", job["id"], time.monotonic()))
return (True, "output", "response", None)
jobs = [
{"id": "s1", "name": "s1", "deliver": "local"},
{"id": "s2", "name": "s2", "deliver": "local"},
]
with patch("cron.scheduler.get_due_jobs", return_value=jobs), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.run_job", side_effect=mock_run_job), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
result = tick(verbose=False)
assert result == 2
# With max_workers=1, second job starts after first ends
end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0]
start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0]
assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1"
class TestDeliverResultTimeoutCancelsFuture:
"""When future.result(timeout=60) raises TimeoutError in the live
adapter delivery path, _deliver_result must cancel the orphan
coroutine so it cannot duplicate-send after the standalone fallback.
"""
def test_live_adapter_timeout_cancels_future_and_falls_back(self):
"""End-to-end: live adapter hangs past the 60s budget, _deliver_result
patches the timeout down to a fast value, confirms future.cancel() fires,
and verifies the standalone fallback path still delivers."""
from gateway.config import Platform
from concurrent.futures import Future
# Live adapter whose send() coroutine never resolves within the budget
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
# A real concurrent.futures.Future so .cancel() has real semantics,
# but we override .result() to raise TimeoutError exactly like the
# 60s wait firing in production.
captured_future = Future()
cancel_calls = []
original_cancel = captured_future.cancel
def tracking_cancel():
cancel_calls.append(True)
return original_cancel()
captured_future.cancel = tracking_cancel
captured_future.result = MagicMock(side_effect=TimeoutError("timed out"))
def fake_run_coro(coro, _loop):
coro.close()
return captured_future
job = {
"id": "timeout-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
standalone_send = AsyncMock(return_value={"success": True})
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \
patch("tools.send_message_tool._send_to_platform", new=standalone_send):
result = _deliver_result(
job,
"Hello world",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
# 1. The orphan future was cancelled on timeout (the bug fix)
assert cancel_calls == [True], "future.cancel() must fire on TimeoutError"
# 2. The standalone fallback delivered — no double send, no silent drop
assert result is None, f"expected successful delivery, got error: {result!r}"
standalone_send.assert_awaited_once()
class TestSendMediaTimeoutCancelsFuture:
"""Same orphan-coroutine guarantee for _send_media_via_adapter's
future.result(timeout=30) call. If this times out mid-batch, the
in-flight coroutine must be cancelled before the next file is tried.
"""
def test_media_send_timeout_cancels_future_and_continues(self):
"""End-to-end: _send_media_via_adapter with a future whose .result()
raises TimeoutError. Assert cancel() fires and the loop proceeds
to the next file rather than hanging or crashing."""
from concurrent.futures import Future
adapter = MagicMock()
adapter.send_image_file = AsyncMock()
adapter.send_video = AsyncMock()
# First file: future that times out. Second file: future that resolves OK.
timeout_future = Future()
timeout_cancel_calls = []
original_cancel = timeout_future.cancel
def tracking_cancel():
timeout_cancel_calls.append(True)
return original_cancel()
timeout_future.cancel = tracking_cancel
timeout_future.result = MagicMock(side_effect=TimeoutError("timed out"))
ok_future = Future()
ok_future.set_result(MagicMock(success=True))
futures_iter = iter([timeout_future, ok_future])
def fake_run_coro(coro, _loop):
coro.close()
return next(futures_iter)
media_files = [
("/tmp/slow.png", False), # times out
("/tmp/fast.mp4", False), # succeeds
]
loop = MagicMock()
job = {"id": "media-timeout"}
with patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
# Should not raise — the except Exception clause swallows the timeout
_send_media_via_adapter(adapter, "chat-1", media_files, None, loop, job)
# 1. The timed-out future was cancelled (the bug fix)
assert timeout_cancel_calls == [True], "future.cancel() must fire on TimeoutError"
# 2. Second file still got dispatched — one timeout doesn't abort the batch
adapter.send_video.assert_called_once()
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/fast.mp4"