hermes-agent/tests/cron/test_scheduler.py
Teknium e017131403 feat(cron): add wakeAgent gate — scripts can skip the agent entirely
Extends the existing cron script hook with a wake gate ported from
nanoclaw #1232. When a cron job's pre-check Python script (already
sandboxed to HERMES_HOME/scripts/) writes a JSON line like
```json
{"wakeAgent": false}
```
on its last stdout line, `run_job()` returns the SILENT marker and
skips the agent entirely — no LLM call, no delivery, no tokens spent.
Useful for frequent polls (every 1-5 min) that only need to wake the
agent when something has genuinely changed.

Any other script output (non-JSON, missing key, non-dict, `wakeAgent: true`,
truthy/falsy non-False values) behaves as before: stdout is injected
as context and the agent runs normally. Strict `False` is required
to skip — avoids accidental gating from arbitrary JSON.

Refactor:
- New pure helper `_parse_wake_gate(script_output)` in cron/scheduler.py
- `_build_job_prompt` accepts optional `prerun_script` tuple so the
  script runs exactly once per job (run_job runs it for the gate check,
  reuses the output for prompt injection)
- `run_job` short-circuits with SILENT_MARKER when gate fires

Script failures (success=False) still cannot trigger the gate — the
failure is reported as context to the agent as before.

This replaces the approach in closed PR #3837, which inlined bash
scripts via tempfile and lost the path-traversal/scripts-dir sandbox
that main's impl has. The wake-gate idea (the one net-new capability)
is ported on top of the existing sandboxed Python-script model.

Tests:
- 11 pure unit tests for _parse_wake_gate (empty, whitespace, non-JSON,
  non-dict JSON, missing key, truthy/falsy non-False, multi-line,
  trailing blanks, non-last-line JSON)
- 5 integration tests for run_job wake-gate (skip returns SILENT,
  wake-true passes through, script-runs-only-once, script failure
  doesn't gate, no-script regression)
- Full tests/cron/ suite: 194/194 pass
2026-04-19 01:42:35 -07:00

1435 lines
57 KiB
Python

"""Tests for cron/scheduler.py — origin resolution, delivery routing, and error logging."""
import json
import logging
import os
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt
from tools.env_passthrough import clear_env_passthrough
from tools.credential_files import clear_credential_files
class TestResolveOrigin:
def test_full_origin(self):
job = {
"origin": {
"platform": "telegram",
"chat_id": "123456",
"chat_name": "Test Chat",
"thread_id": "42",
}
}
result = _resolve_origin(job)
assert isinstance(result, dict)
assert result == job["origin"]
assert result["platform"] == "telegram"
assert result["chat_id"] == "123456"
assert result["chat_name"] == "Test Chat"
assert result["thread_id"] == "42"
def test_no_origin(self):
assert _resolve_origin({}) is None
assert _resolve_origin({"origin": None}) is None
def test_missing_platform(self):
job = {"origin": {"chat_id": "123"}}
assert _resolve_origin(job) is None
def test_missing_chat_id(self):
job = {"origin": {"platform": "telegram"}}
assert _resolve_origin(job) is None
def test_empty_origin(self):
job = {"origin": {}}
assert _resolve_origin(job) is None
class TestResolveDeliveryTarget:
def test_origin_delivery_preserves_thread_id(self):
job = {
"deliver": "origin",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
@pytest.mark.parametrize(
("platform", "env_var", "chat_id"),
[
("matrix", "MATRIX_HOME_ROOM", "!bot-room:example.org"),
("signal", "SIGNAL_HOME_CHANNEL", "+15551234567"),
("mattermost", "MATTERMOST_HOME_CHANNEL", "team-town-square"),
("sms", "SMS_HOME_CHANNEL", "+15557654321"),
("email", "EMAIL_HOME_ADDRESS", "home@example.com"),
("dingtalk", "DINGTALK_HOME_CHANNEL", "cidNNN"),
("feishu", "FEISHU_HOME_CHANNEL", "oc_home"),
("wecom", "WECOM_HOME_CHANNEL", "wecom-home"),
("weixin", "WEIXIN_HOME_CHANNEL", "wxid_home"),
("qqbot", "QQ_HOME_CHANNEL", "group-openid-home"),
],
)
def test_origin_delivery_without_origin_falls_back_to_supported_home_channels(
self, monkeypatch, platform, env_var, chat_id
):
for fallback_env in (
"MATRIX_HOME_ROOM",
"MATRIX_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL",
"DISCORD_HOME_CHANNEL",
"SLACK_HOME_CHANNEL",
"SIGNAL_HOME_CHANNEL",
"MATTERMOST_HOME_CHANNEL",
"SMS_HOME_CHANNEL",
"EMAIL_HOME_ADDRESS",
"DINGTALK_HOME_CHANNEL",
"BLUEBUBBLES_HOME_CHANNEL",
"FEISHU_HOME_CHANNEL",
"WECOM_HOME_CHANNEL",
"WEIXIN_HOME_CHANNEL",
"QQ_HOME_CHANNEL",
):
monkeypatch.delenv(fallback_env, raising=False)
monkeypatch.setenv(env_var, chat_id)
assert _resolve_delivery_target({"deliver": "origin"}) == {
"platform": platform,
"chat_id": chat_id,
"thread_id": None,
}
def test_bare_matrix_delivery_uses_matrix_home_room(self, monkeypatch):
monkeypatch.delenv("MATRIX_HOME_CHANNEL", raising=False)
monkeypatch.setenv("MATRIX_HOME_ROOM", "!room123:example.org")
assert _resolve_delivery_target({"deliver": "matrix"}) == {
"platform": "matrix",
"chat_id": "!room123:example.org",
"thread_id": None,
}
def test_explicit_telegram_topic_target_with_thread_id(self):
"""deliver: 'telegram:chat_id:thread_id' parses correctly."""
job = {
"deliver": "telegram:-1003724596514:17",
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1003724596514",
"thread_id": "17",
}
def test_explicit_telegram_chat_id_without_thread_id(self):
"""deliver: 'telegram:chat_id' sets thread_id to None."""
job = {
"deliver": "telegram:-1003724596514",
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1003724596514",
"thread_id": None,
}
def test_human_friendly_label_resolved_via_channel_directory(self):
"""deliver: 'whatsapp:Alice (dm)' resolves to the real JID."""
job = {"deliver": "whatsapp:Alice (dm)"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="12345678901234@lid",
) as resolve_mock:
result = _resolve_delivery_target(job)
resolve_mock.assert_called_once_with("whatsapp", "Alice (dm)")
assert result == {
"platform": "whatsapp",
"chat_id": "12345678901234@lid",
"thread_id": None,
}
def test_human_friendly_label_without_suffix_resolved(self):
"""deliver: 'telegram:My Group' resolves without display suffix."""
job = {"deliver": "telegram:My Group"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="-1009999",
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "telegram",
"chat_id": "-1009999",
"thread_id": None,
}
def test_human_friendly_topic_label_preserves_thread_id(self):
"""Resolved Telegram topic labels should split chat_id and thread_id."""
job = {"deliver": "telegram:Coaching Chat / topic 17585 (group)"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value="-1009999:17585",
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "telegram",
"chat_id": "-1009999",
"thread_id": "17585",
}
def test_raw_id_not_mangled_when_directory_returns_none(self):
"""deliver: 'whatsapp:12345@lid' passes through when directory has no match."""
job = {"deliver": "whatsapp:12345@lid"}
with patch(
"gateway.channel_directory.resolve_channel_name",
return_value=None,
):
result = _resolve_delivery_target(job)
assert result == {
"platform": "whatsapp",
"chat_id": "12345@lid",
"thread_id": None,
}
def test_bare_platform_uses_matching_origin_chat(self):
job = {
"deliver": "telegram",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
}
def test_bare_platform_falls_back_to_home_channel(self, monkeypatch):
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "-2002")
job = {
"deliver": "telegram",
"origin": {
"platform": "discord",
"chat_id": "abc",
},
}
assert _resolve_delivery_target(job) == {
"platform": "telegram",
"chat_id": "-2002",
"thread_id": None,
}
def test_explicit_discord_topic_target_with_thread_id(self):
"""deliver: 'discord:chat_id:thread_id' parses correctly."""
job = {
"deliver": "discord:-1001234567890:17585",
}
assert _resolve_delivery_target(job) == {
"platform": "discord",
"chat_id": "-1001234567890",
"thread_id": "17585",
}
def test_explicit_discord_chat_id_without_thread_id(self):
"""deliver: 'discord:chat_id' sets thread_id to None."""
job = {
"deliver": "discord:9876543210",
}
assert _resolve_delivery_target(job) == {
"platform": "discord",
"chat_id": "9876543210",
"thread_id": None,
}
def test_explicit_discord_channel_without_thread(self):
"""deliver: 'discord:1001234567890' resolves via explicit platform:chat_id path."""
job = {
"deliver": "discord:1001234567890",
}
result = _resolve_delivery_target(job)
assert result == {
"platform": "discord",
"chat_id": "1001234567890",
"thread_id": None,
}
class TestDeliverResultWrapping:
"""Verify that cron deliveries are wrapped with header/footer and no longer mirrored."""
def test_delivery_wraps_content_with_header_and_footer(self):
"""Delivered content should include task name header and agent-invisible note."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Here is today's summary.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: daily-report" in sent_content
assert "(job_id: test-job)" in sent_content
assert "-------------" in sent_content
assert "Here is today's summary." in sent_content
assert "To stop or manage this job" in sent_content
def test_delivery_uses_job_id_when_no_name(self):
"""When a job has no name, the wrapper should fall back to job id."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "abc-123",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Output.")
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: abc-123" in sent_content
def test_delivery_skips_wrapping_when_config_disabled(self):
"""When cron.wrap_response is false, deliver raw content without header/footer."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}):
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Clean output only.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert sent_content == "Clean output only."
assert "Cronjob Response" not in sent_content
assert "The agent cannot see" not in sent_content
def test_delivery_extracts_media_tags_before_send(self):
"""Cron delivery should pass MEDIA attachments separately to the send helper."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}):
job = {
"id": "voice-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Title\nMEDIA:/tmp/test-voice.ogg")
send_mock.assert_called_once()
args, kwargs = send_mock.call_args
# Text content should have MEDIA: tag stripped
assert "MEDIA:" not in args[3]
assert "Title" in args[3]
# Media files should be forwarded separately
assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)]
def test_live_adapter_sends_media_as_attachments(self):
"""When a live adapter is available, MEDIA files should be sent as native
platform attachments (e.g., Discord voice, Telegram audio) rather than
as literal 'MEDIA:/path' text."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
adapter.send_voice.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.DISCORD: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
# run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg)
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "tts-job",
"deliver": "origin",
"origin": {"platform": "discord", "chat_id": "9876"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Here is TTS\nMEDIA:/tmp/cron-voice.mp3",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
# Text should be sent without the MEDIA tag
adapter.send.assert_called_once()
text_sent = adapter.send.call_args[0][1]
assert "MEDIA:" not in text_sent
assert "Here is TTS" in text_sent
# Audio file should be sent as a voice attachment
adapter.send_voice.assert_called_once()
voice_call = adapter.send_voice.call_args
assert voice_call[1]["audio_path"] == "/tmp/cron-voice.mp3"
def test_live_adapter_routes_image_to_send_image_file(self):
"""Image MEDIA files should be routed to send_image_file, not send_voice."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
adapter.send_image_file.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.DISCORD: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "img-job",
"deliver": "origin",
"origin": {"platform": "discord", "chat_id": "1234"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Chart attached\nMEDIA:/tmp/chart.png",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
adapter.send_image_file.assert_called_once()
assert adapter.send_image_file.call_args[1]["image_path"] == "/tmp/chart.png"
adapter.send_voice.assert_not_called()
def test_live_adapter_media_only_no_text(self):
"""When content is ONLY a MEDIA tag with no text, media should still be sent."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send_voice.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "voice-only",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "999"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"MEDIA:/tmp/voice.ogg",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
# Text send should NOT be called (no text after stripping MEDIA tag)
adapter.send.assert_not_called()
# Audio should still be delivered
adapter.send_voice.assert_called_once()
def test_live_adapter_sends_cleaned_text_not_raw(self):
"""The live adapter path must send cleaned text (MEDIA tags stripped),
not the raw delivery_content with embedded MEDIA: tags."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "img-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "555"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Report\nMEDIA:/tmp/chart.png",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
text_sent = adapter.send.call_args[0][1]
assert "MEDIA:" not in text_sent
assert "Report" in text_sent
def test_no_mirror_to_session_call(self):
"""Cron deliveries should NOT mirror into the gateway session."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session") as mirror_mock:
job = {
"id": "test-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Hello!")
mirror_mock.assert_not_called()
def test_origin_delivery_preserves_thread_id(self):
"""Origin delivery should forward thread_id to the send helper."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
job = {
"id": "test-job",
"name": "topic-job",
"deliver": "origin",
"origin": {
"platform": "telegram",
"chat_id": "-1001",
"thread_id": "17585",
},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
_deliver_result(job, "hello")
send_mock.assert_called_once()
assert send_mock.call_args.kwargs["thread_id"] == "17585"
class TestDeliverResultErrorReturns:
"""Verify _deliver_result returns error strings on failure, None on success."""
def test_returns_error_when_platform_disabled(self):
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = False
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg):
job = {
"id": "disabled",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
result = _deliver_result(job, "Output.")
assert result is not None
assert "not configured" in result
def test_returns_error_for_unresolved_target(self, monkeypatch):
"""Non-local delivery with no resolvable target should return an error."""
monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
job = {"id": "no-target", "deliver": "telegram"}
result = _deliver_result(job, "Output.")
assert result is not None
assert "no delivery target" in result
class TestRunJobSessionPersistence:
def test_run_job_passes_session_db_and_cron_platform(self, tmp_path):
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "test-key",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert "ok" in output
kwargs = mock_agent_cls.call_args.kwargs
assert kwargs["session_db"] is fake_db
assert kwargs["platform"] == "cron"
assert kwargs["session_id"].startswith("cron_test-job_")
fake_db.end_session.assert_called_once()
call_args = fake_db.end_session.call_args
assert call_args[0][0].startswith("cron_test-job_")
assert call_args[0][1] == "cron_complete"
fake_db.close.assert_called_once()
def test_run_job_empty_response_returns_empty_not_placeholder(self, tmp_path):
"""Empty final_response should stay empty for delivery logic (issue #2234).
The placeholder '(No response generated)' should only appear in the
output log, not in the returned final_response that's used for delivery.
"""
job = {
"id": "silent-job",
"name": "silent test",
"prompt": "do work via tools only",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
# Agent did work via tools but returned no text
mock_agent.run_conversation.return_value = {"final_response": ""}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
# final_response should be empty for delivery logic to skip
assert final_response == ""
# But the output log should show the placeholder
assert "(No response generated)" in output
def test_tick_marks_empty_response_as_error(self, tmp_path):
"""When run_job returns success=True but final_response is empty,
tick() should mark the job as error so last_status != 'ok'.
(issue #8585)
"""
from cron.scheduler import tick
from cron.jobs import load_jobs, save_jobs
job = {
"id": "empty-job",
"name": "empty-test",
"prompt": "do something",
"schedule": "every 1h",
"enabled": True,
"next_run_at": "2020-01-01T00:00:00",
"deliver": "local",
"last_status": None,
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler.get_due_jobs", return_value=[job]), \
patch("cron.scheduler.advance_next_run"), \
patch("cron.scheduler.mark_job_run") as mock_mark, \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("cron.scheduler.run_job", return_value=(True, "output", "", None)):
tick(verbose=False)
# Should be called with success=False because final_response is empty
mock_mark.assert_called_once()
call_args = mock_mark.call_args
assert call_args[0][0] == "empty-job"
assert call_args[0][1] is False # success should be False
assert "empty" in call_args[0][2].lower() # error should mention empty
def test_run_job_sets_auto_delivery_env_from_dotenv_home_channel(self, tmp_path, monkeypatch):
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
"deliver": "telegram",
}
fake_db = MagicMock()
seen = {}
(tmp_path / ".env").write_text("TELEGRAM_HOME_CHANNEL=-2002\n")
monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_PLATFORM", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID", raising=False)
monkeypatch.delenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID", raising=False)
class FakeAgent:
def __init__(self, *args, **kwargs):
pass
def run_conversation(self, *args, **kwargs):
seen["platform"] = os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM")
seen["chat_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID")
seen["thread_id"] = os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID")
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent", FakeAgent):
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert "ok" in output
assert seen == {
"platform": "telegram",
"chat_id": "-2002",
"thread_id": None,
}
assert os.getenv("HERMES_CRON_AUTO_DELIVER_PLATFORM") is None
assert os.getenv("HERMES_CRON_AUTO_DELIVER_CHAT_ID") is None
assert os.getenv("HERMES_CRON_AUTO_DELIVER_THREAD_ID") is None
fake_db.close.assert_called_once()
class TestRunJobConfigLogging:
"""Verify that config.yaml parse failures are logged, not silently swallowed."""
def test_bad_config_yaml_is_logged(self, caplog, tmp_path):
"""When config.yaml is malformed, a warning should be logged."""
bad_yaml = tmp_path / "config.yaml"
bad_yaml.write_text("invalid: yaml: [[[bad")
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
run_job(job)
assert any("failed to load config.yaml" in r.message for r in caplog.records), \
f"Expected 'failed to load config.yaml' warning in logs, got: {[r.message for r in caplog.records]}"
def test_bad_prefill_messages_is_logged(self, caplog, tmp_path):
"""When the prefill messages file contains invalid JSON, a warning should be logged."""
# Valid config.yaml that points to a bad prefill file
config_yaml = tmp_path / "config.yaml"
config_yaml.write_text("prefill_messages_file: prefill.json\n")
bad_prefill = tmp_path / "prefill.json"
bad_prefill.write_text("{not valid json!!!")
job = {
"id": "test-job",
"name": "test",
"prompt": "hello",
}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
run_job(job)
assert any("failed to parse prefill messages" in r.message for r in caplog.records), \
f"Expected 'failed to parse prefill messages' warning in logs, got: {[r.message for r in caplog.records]}"
class TestRunJobSkillBacked:
def test_run_job_preserves_skill_env_passthrough_into_worker_thread(self, tmp_path):
job = {
"id": "skill-env-job",
"name": "skill env test",
"prompt": "Use the skill.",
"skill": "notion",
}
fake_db = MagicMock()
def _skill_view(name):
assert name == "notion"
from tools.env_passthrough import register_env_passthrough
register_env_passthrough(["NOTION_API_KEY"])
return json.dumps({"success": True, "content": "# notion\nUse Notion."})
def _run_conversation(prompt):
from tools.env_passthrough import get_all_passthrough
assert "NOTION_API_KEY" in get_all_passthrough()
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.side_effect = _run_conversation
mock_agent_cls.return_value = mock_agent
try:
success, output, final_response, error = run_job(job)
finally:
clear_env_passthrough()
assert success is True
assert error is None
assert final_response == "ok"
def test_run_job_preserves_credential_file_passthrough_into_worker_thread(self, tmp_path):
"""copy_context() also propagates credential_files ContextVar."""
job = {
"id": "cred-env-job",
"name": "cred file test",
"prompt": "Use the skill.",
"skill": "google-workspace",
}
fake_db = MagicMock()
# Create a credential file so register_credential_file succeeds
cred_dir = tmp_path / "credentials"
cred_dir.mkdir()
(cred_dir / "google_token.json").write_text('{"token": "t"}')
def _skill_view(name):
assert name == "google-workspace"
from tools.credential_files import register_credential_file
register_credential_file("credentials/google_token.json")
return json.dumps({"success": True, "content": "# google-workspace\nUse Google."})
def _run_conversation(prompt):
from tools.credential_files import _get_registered
registered = _get_registered()
assert registered, "credential files must be visible in worker thread"
assert any("google_token.json" in v for v in registered.values())
return {"final_response": "ok"}
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("tools.credential_files._resolve_hermes_home", return_value=tmp_path), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.side_effect = _run_conversation
mock_agent_cls.return_value = mock_agent
try:
success, output, final_response, error = run_job(job)
finally:
clear_credential_files()
assert success is True
assert error is None
assert final_response == "ok"
def test_run_job_loads_skill_and_disables_recursive_cron_tools(self, tmp_path):
job = {
"id": "skill-job",
"name": "skill test",
"prompt": "Check the feeds and summarize anything new.",
"skill": "blogwatcher",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", return_value=json.dumps({"success": True, "content": "# Blogwatcher\nFollow this skill."})), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
kwargs = mock_agent_cls.call_args.kwargs
assert "cronjob" in (kwargs["disabled_toolsets"] or [])
prompt_arg = mock_agent.run_conversation.call_args.args[0]
assert "blogwatcher" in prompt_arg
assert "Follow this skill" in prompt_arg
assert "Check the feeds and summarize anything new." in prompt_arg
def test_run_job_loads_multiple_skills_in_order(self, tmp_path):
job = {
"id": "multi-skill-job",
"name": "multi skill test",
"prompt": "Combine the results.",
"skills": ["blogwatcher", "find-nearby"],
}
fake_db = MagicMock()
def _skill_view(name):
return json.dumps({"success": True, "content": f"# {name}\nInstructions for {name}."})
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("tools.skills_tool.skill_view", side_effect=_skill_view) as skill_view_mock, \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is True
assert error is None
assert final_response == "ok"
assert skill_view_mock.call_count == 2
assert [call.args[0] for call in skill_view_mock.call_args_list] == ["blogwatcher", "find-nearby"]
prompt_arg = mock_agent.run_conversation.call_args.args[0]
assert prompt_arg.index("blogwatcher") < prompt_arg.index("find-nearby")
assert "Instructions for blogwatcher." in prompt_arg
assert "Instructions for find-nearby." in prompt_arg
assert "Combine the results." in prompt_arg
class TestSilentDelivery:
"""Verify that [SILENT] responses suppress delivery while still saving output."""
def _make_job(self):
return {
"id": "monitor-job",
"name": "monitor",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
def test_silent_response_suppresses_delivery(self, caplog):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT]", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
with caplog.at_level(logging.INFO, logger="cron.scheduler"):
tick(verbose=False)
deliver_mock.assert_not_called()
assert any(SILENT_MARKER in r.message for r in caplog.records)
def test_silent_with_note_suppresses_delivery(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[SILENT] No changes detected", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_silent_trailing_suppresses_delivery(self):
"""Agent appended [SILENT] after explanation text — must still suppress."""
response = "2 deals filtered out (like<10, reply<15).\n\n[SILENT]"
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", response, None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_silent_is_case_insensitive(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# output", "[silent] nothing new", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_not_called()
def test_failed_job_always_delivers(self):
"""Failed jobs deliver regardless of [SILENT] in output."""
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(False, "# output", "", "some error")), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
from cron.scheduler import tick
tick(verbose=False)
deliver_mock.assert_called_once()
def test_output_saved_even_when_delivery_suppressed(self):
with patch("cron.scheduler.get_due_jobs", return_value=[self._make_job()]), \
patch("cron.scheduler.run_job", return_value=(True, "# full output", "[SILENT]", None)), \
patch("cron.scheduler.save_job_output") as save_mock, \
patch("cron.scheduler._deliver_result") as deliver_mock, \
patch("cron.scheduler.mark_job_run"):
save_mock.return_value = "/tmp/out.md"
from cron.scheduler import tick
tick(verbose=False)
save_mock.assert_called_once_with("monitor-job", "# full output")
deliver_mock.assert_not_called()
class TestBuildJobPromptSilentHint:
"""Verify _build_job_prompt always injects [SILENT] guidance."""
def test_hint_always_present(self):
job = {"prompt": "Check for updates"}
result = _build_job_prompt(job)
assert "[SILENT]" in result
assert "Check for updates" in result
def test_hint_present_even_without_prompt(self):
job = {"prompt": ""}
result = _build_job_prompt(job)
assert "[SILENT]" in result
def test_delivery_guidance_present(self):
"""Cron hint tells agents their final response is auto-delivered."""
job = {"prompt": "Generate a report"}
result = _build_job_prompt(job)
assert "do NOT use send_message" in result
assert "automatically delivered" in result
def test_delivery_guidance_precedes_user_prompt(self):
"""System guidance appears before the user's prompt text."""
job = {"prompt": "My custom prompt"}
result = _build_job_prompt(job)
system_pos = result.index("do NOT use send_message")
prompt_pos = result.index("My custom prompt")
assert system_pos < prompt_pos
class TestParseWakeGate:
"""Unit tests for _parse_wake_gate — pure function, no side effects."""
def test_empty_output_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate("") is True
assert _parse_wake_gate(None) is True
def test_whitespace_only_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate(" \n\n \t\n") is True
def test_non_json_last_line_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate("hello world") is True
assert _parse_wake_gate("line 1\nline 2\nplain text") is True
def test_json_non_dict_wakes(self):
"""Bare arrays, numbers, strings must not be interpreted as a gate."""
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate("[1, 2, 3]") is True
assert _parse_wake_gate("42") is True
assert _parse_wake_gate('"wakeAgent"') is True
def test_wake_gate_false_skips(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"wakeAgent": false}') is False
def test_wake_gate_true_wakes(self):
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"wakeAgent": true}') is True
def test_wake_gate_missing_wakes(self):
"""A JSON dict without a wakeAgent key defaults to waking."""
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"data": {"foo": "bar"}}') is True
def test_non_boolean_false_still_wakes(self):
"""Only strict ``False`` skips — truthy/falsy shortcuts are too risky."""
from cron.scheduler import _parse_wake_gate
assert _parse_wake_gate('{"wakeAgent": 0}') is True
assert _parse_wake_gate('{"wakeAgent": null}') is True
assert _parse_wake_gate('{"wakeAgent": ""}') is True
def test_only_last_non_empty_line_parsed(self):
from cron.scheduler import _parse_wake_gate
multi = 'some log output\nmore output\n{"wakeAgent": false}'
assert _parse_wake_gate(multi) is False
def test_trailing_blank_lines_ignored(self):
from cron.scheduler import _parse_wake_gate
multi = '{"wakeAgent": false}\n\n\n'
assert _parse_wake_gate(multi) is False
def test_non_last_json_line_does_not_gate(self):
"""A JSON gate on an earlier line with plain text after it does NOT trigger."""
from cron.scheduler import _parse_wake_gate
multi = '{"wakeAgent": false}\nactually this is the real output'
assert _parse_wake_gate(multi) is True
class TestRunJobWakeGate:
"""Integration tests for run_job wake-gate short-circuit."""
def _make_job(self, name="wake-gate-test", script="check.py"):
"""Minimal valid cron job dict for run_job."""
return {
"id": f"job_{name}",
"name": name,
"prompt": "Do a thing",
"schedule": "*/5 * * * *",
"script": script,
}
def test_wake_false_skips_agent_and_returns_silent(self, caplog):
"""When _run_job_script output ends with {wakeAgent: false}, the agent
is not invoked and run_job returns the SILENT marker so delivery is
suppressed."""
from cron.scheduler import SILENT_MARKER
import cron.scheduler as scheduler
with patch.object(scheduler, "_run_job_script",
return_value=(True, '{"wakeAgent": false}')), \
patch("run_agent.AIAgent") as agent_cls:
success, doc, final, err = scheduler.run_job(self._make_job())
assert success is True
assert err is None
assert final == SILENT_MARKER
assert "Script gate returned `wakeAgent=false`" in doc
agent_cls.assert_not_called()
def test_wake_true_runs_agent_with_injected_output(self):
"""When the script returns {wakeAgent: true, data: ...}, the agent is
invoked and the data line still shows up in the prompt."""
import cron.scheduler as scheduler
script_output = '{"wakeAgent": true, "data": {"new": 3}}'
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
with patch.object(scheduler, "_run_job_script",
return_value=(True, script_output)), \
patch("run_agent.AIAgent", return_value=agent) as agent_cls:
success, doc, final, err = scheduler.run_job(self._make_job())
agent_cls.assert_called_once()
# The script output should be visible in the prompt passed to
# run_conversation.
call_kwargs = agent.run_conversation.call_args
prompt_arg = call_kwargs.args[0] if call_kwargs.args else call_kwargs.kwargs.get("user_message", "")
assert script_output in prompt_arg
assert success is True
assert err is None
def test_script_runs_only_once_on_wake(self):
"""Wake-true path must not re-run the script inside _build_job_prompt
(script would execute twice otherwise, wasting work and risking
double-side-effects)."""
import cron.scheduler as scheduler
call_count = 0
def _script_stub(path):
nonlocal call_count
call_count += 1
return (True, "regular output")
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
with patch.object(scheduler, "_run_job_script", side_effect=_script_stub), \
patch("run_agent.AIAgent", return_value=agent):
scheduler.run_job(self._make_job())
assert call_count == 1, f"script ran {call_count}x, expected exactly 1"
def test_script_failure_does_not_trigger_gate(self):
"""If _run_job_script returns success=False, the gate is NOT evaluated
and the agent still runs (the failure is reported as context)."""
import cron.scheduler as scheduler
# Malicious or broken script whose stderr happens to contain the
# gate JSON — we must NOT honor it because ran_ok is False.
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
with patch.object(scheduler, "_run_job_script",
return_value=(False, '{"wakeAgent": false}')), \
patch("run_agent.AIAgent", return_value=agent) as agent_cls:
success, doc, final, err = scheduler.run_job(self._make_job())
agent_cls.assert_called_once() # Agent DID wake despite the gate-like text
def test_no_script_path_runs_agent_normally(self):
"""Regression: jobs without a script still work."""
import cron.scheduler as scheduler
agent = MagicMock()
agent.run_conversation = MagicMock(return_value={
"final_response": "ok", "messages": []
})
job = self._make_job(script=None)
job.pop("script", None)
with patch.object(scheduler, "_run_job_script") as script_fn, \
patch("run_agent.AIAgent", return_value=agent) as agent_cls:
scheduler.run_job(job)
script_fn.assert_not_called()
agent_cls.assert_called_once()
class TestBuildJobPromptMissingSkill:
"""Verify that a missing skill logs a warning and does not crash the job."""
def _missing_skill_view(self, name: str) -> str:
return json.dumps({"success": False, "error": f"Skill '{name}' not found."})
def test_missing_skill_does_not_raise(self):
"""Job should run even when a referenced skill is not installed."""
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill"], "prompt": "do something"})
# prompt is preserved even though skill was skipped
assert "do something" in result
def test_missing_skill_injects_user_notice_into_prompt(self):
"""A system notice about the missing skill is injected into the prompt."""
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill"], "prompt": "do something"})
assert "ghost-skill" in result
assert "not found" in result.lower() or "skipped" in result.lower()
def test_missing_skill_logs_warning(self, caplog):
"""A warning is logged when a skill cannot be found."""
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
with patch("tools.skills_tool.skill_view", side_effect=self._missing_skill_view):
_build_job_prompt({"name": "My Job", "skills": ["ghost-skill"], "prompt": "do something"})
assert any("ghost-skill" in record.message for record in caplog.records)
def test_valid_skill_loaded_alongside_missing(self):
"""A valid skill is still loaded when another skill in the list is missing."""
def _mixed_skill_view(name: str) -> str:
if name == "real-skill":
return json.dumps({"success": True, "content": "Real skill content."})
return json.dumps({"success": False, "error": f"Skill '{name}' not found."})
with patch("tools.skills_tool.skill_view", side_effect=_mixed_skill_view):
result = _build_job_prompt({"skills": ["ghost-skill", "real-skill"], "prompt": "go"})
assert "Real skill content." in result
assert "go" in result
class TestSendMediaViaAdapter:
"""Unit tests for _send_media_via_adapter — routes files to typed adapter methods."""
@staticmethod
def _run_with_loop(adapter, chat_id, media_files, metadata, job):
"""Helper: run _send_media_via_adapter with a real running event loop."""
import asyncio
import threading
loop = asyncio.new_event_loop()
t = threading.Thread(target=loop.run_forever, daemon=True)
t.start()
try:
_send_media_via_adapter(adapter, chat_id, media_files, metadata, loop, job)
finally:
loop.call_soon_threadsafe(loop.stop)
t.join(timeout=5)
loop.close()
def test_video_dispatched_to_send_video(self):
adapter = MagicMock()
adapter.send_video = AsyncMock()
media_files = [("/tmp/clip.mp4", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j1"})
adapter.send_video.assert_called_once()
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/clip.mp4"
def test_unknown_ext_dispatched_to_send_document(self):
adapter = MagicMock()
adapter.send_document = AsyncMock()
media_files = [("/tmp/report.pdf", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j2"})
adapter.send_document.assert_called_once()
assert adapter.send_document.call_args[1]["file_path"] == "/tmp/report.pdf"
def test_multiple_media_files_all_delivered(self):
adapter = MagicMock()
adapter.send_voice = AsyncMock()
adapter.send_image_file = AsyncMock()
media_files = [("/tmp/voice.mp3", False), ("/tmp/photo.jpg", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"})
adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once()