mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
test(cron): exercise _deliver_result and _send_media_via_adapter directly for timeout-cancel
The original tests replicated the try/except/cancel/raise pattern inline with a mocked future, which tested Python's try/except semantics rather than the scheduler's behavior. Rewrite them to invoke _deliver_result and _send_media_via_adapter end-to-end with a real concurrent.futures.Future whose .result() raises TimeoutError. Mutation-verified: both tests fail when the try/except wrappers are removed from cron/scheduler.py, pass with them in place.
This commit is contained in:
parent
18e7fd8364
commit
267b2faa15
1 changed files with 101 additions and 46 deletions
|
|
@ -1580,73 +1580,128 @@ class TestParallelTick:
|
|||
end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0]
|
||||
start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0]
|
||||
assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1"
|
||||
async def _noop_coro():
|
||||
"""Placeholder coroutine used by timeout-cancel tests."""
|
||||
return None
|
||||
|
||||
|
||||
class TestDeliverResultTimeoutCancelsFuture:
|
||||
"""When future.result(timeout=60) raises TimeoutError in the live
|
||||
adapter delivery path, the orphan coroutine must be cancelled before
|
||||
the exception propagates to the standalone fallback.
|
||||
adapter delivery path, _deliver_result must cancel the orphan
|
||||
coroutine so it cannot duplicate-send after the standalone fallback.
|
||||
"""
|
||||
|
||||
def test_timeout_cancels_future_before_fallback(self):
|
||||
"""TimeoutError from future.result must trigger future.cancel()."""
|
||||
def test_live_adapter_timeout_cancels_future_and_falls_back(self):
|
||||
"""End-to-end: live adapter hangs past the 60s budget, _deliver_result
|
||||
patches the timeout down to a fast value, confirms future.cancel() fires,
|
||||
and verifies the standalone fallback path still delivers."""
|
||||
from gateway.config import Platform
|
||||
from concurrent.futures import Future
|
||||
|
||||
future = MagicMock(spec=Future)
|
||||
future.result.side_effect = TimeoutError("timed out")
|
||||
# Live adapter whose send() coroutine never resolves within the budget
|
||||
adapter = AsyncMock()
|
||||
adapter.send.return_value = MagicMock(success=True)
|
||||
|
||||
def fake_run_coro(coro, loop):
|
||||
pconfig = MagicMock()
|
||||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
# A real concurrent.futures.Future so .cancel() has real semantics,
|
||||
# but we override .result() to raise TimeoutError exactly like the
|
||||
# 60s wait firing in production.
|
||||
captured_future = Future()
|
||||
cancel_calls = []
|
||||
original_cancel = captured_future.cancel
|
||||
|
||||
def tracking_cancel():
|
||||
cancel_calls.append(True)
|
||||
return original_cancel()
|
||||
|
||||
captured_future.cancel = tracking_cancel
|
||||
captured_future.result = MagicMock(side_effect=TimeoutError("timed out"))
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
coro.close()
|
||||
return future
|
||||
return captured_future
|
||||
|
||||
with patch(
|
||||
"asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro
|
||||
):
|
||||
with pytest.raises(TimeoutError):
|
||||
import asyncio
|
||||
f = asyncio.run_coroutine_threadsafe(
|
||||
_noop_coro(), MagicMock()
|
||||
)
|
||||
try:
|
||||
f.result(timeout=60)
|
||||
except TimeoutError:
|
||||
f.cancel()
|
||||
raise
|
||||
job = {
|
||||
"id": "timeout-job",
|
||||
"deliver": "origin",
|
||||
"origin": {"platform": "telegram", "chat_id": "123"},
|
||||
}
|
||||
|
||||
future.cancel.assert_called_once()
|
||||
standalone_send = AsyncMock(return_value={"success": True})
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \
|
||||
patch("tools.send_message_tool._send_to_platform", new=standalone_send):
|
||||
result = _deliver_result(
|
||||
job,
|
||||
"Hello world",
|
||||
adapters={Platform.TELEGRAM: adapter},
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
# 1. The orphan future was cancelled on timeout (the bug fix)
|
||||
assert cancel_calls == [True], "future.cancel() must fire on TimeoutError"
|
||||
# 2. The standalone fallback delivered — no double send, no silent drop
|
||||
assert result is None, f"expected successful delivery, got error: {result!r}"
|
||||
standalone_send.assert_awaited_once()
|
||||
|
||||
|
||||
class TestSendMediaTimeoutCancelsFuture:
|
||||
"""Same orphan-coroutine guarantee for _send_media_via_adapter's
|
||||
future.result(timeout=30) call.
|
||||
future.result(timeout=30) call. If this times out mid-batch, the
|
||||
in-flight coroutine must be cancelled before the next file is tried.
|
||||
"""
|
||||
|
||||
def test_media_timeout_cancels_future(self):
|
||||
"""TimeoutError from the media-send future must call cancel()."""
|
||||
def test_media_send_timeout_cancels_future_and_continues(self):
|
||||
"""End-to-end: _send_media_via_adapter with a future whose .result()
|
||||
raises TimeoutError. Assert cancel() fires and the loop proceeds
|
||||
to the next file rather than hanging or crashing."""
|
||||
from concurrent.futures import Future
|
||||
|
||||
future = MagicMock(spec=Future)
|
||||
future.result.side_effect = TimeoutError("timed out")
|
||||
adapter = MagicMock()
|
||||
adapter.send_image_file = AsyncMock()
|
||||
adapter.send_video = AsyncMock()
|
||||
|
||||
def fake_run_coro(coro, loop):
|
||||
# First file: future that times out. Second file: future that resolves OK.
|
||||
timeout_future = Future()
|
||||
timeout_cancel_calls = []
|
||||
original_cancel = timeout_future.cancel
|
||||
|
||||
def tracking_cancel():
|
||||
timeout_cancel_calls.append(True)
|
||||
return original_cancel()
|
||||
|
||||
timeout_future.cancel = tracking_cancel
|
||||
timeout_future.result = MagicMock(side_effect=TimeoutError("timed out"))
|
||||
|
||||
ok_future = Future()
|
||||
ok_future.set_result(MagicMock(success=True))
|
||||
|
||||
futures_iter = iter([timeout_future, ok_future])
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
coro.close()
|
||||
return future
|
||||
return next(futures_iter)
|
||||
|
||||
with patch(
|
||||
"asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro
|
||||
):
|
||||
with pytest.raises(TimeoutError):
|
||||
import asyncio
|
||||
f = asyncio.run_coroutine_threadsafe(
|
||||
_noop_coro(), MagicMock()
|
||||
)
|
||||
try:
|
||||
f.result(timeout=30)
|
||||
except TimeoutError:
|
||||
f.cancel()
|
||||
raise
|
||||
media_files = [
|
||||
("/tmp/slow.png", False), # times out
|
||||
("/tmp/fast.mp4", False), # succeeds
|
||||
]
|
||||
|
||||
future.cancel.assert_called_once()
|
||||
loop = MagicMock()
|
||||
job = {"id": "media-timeout"}
|
||||
|
||||
with patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||
# Should not raise — the except Exception clause swallows the timeout
|
||||
_send_media_via_adapter(adapter, "chat-1", media_files, None, loop, job)
|
||||
|
||||
# 1. The timed-out future was cancelled (the bug fix)
|
||||
assert timeout_cancel_calls == [True], "future.cancel() must fire on TimeoutError"
|
||||
# 2. Second file still got dispatched — one timeout doesn't abort the batch
|
||||
adapter.send_video.assert_called_once()
|
||||
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/fast.mp4"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue