From 267b2faa15bfc6dfd7336ba492a07b8d364c413b Mon Sep 17 00:00:00 2001 From: teknium1 Date: Tue, 21 Apr 2026 05:46:18 -0700 Subject: [PATCH] test(cron): exercise _deliver_result and _send_media_via_adapter directly for timeout-cancel The original tests replicated the try/except/cancel/raise pattern inline with a mocked future, which tested Python's try/except semantics rather than the scheduler's behavior. Rewrite them to invoke _deliver_result and _send_media_via_adapter end-to-end with a real concurrent.futures.Future whose .result() raises TimeoutError. Mutation-verified: both tests fail when the try/except wrappers are removed from cron/scheduler.py, pass with them in place. --- tests/cron/test_scheduler.py | 147 ++++++++++++++++++++++++----------- 1 file changed, 101 insertions(+), 46 deletions(-) diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index c4c722d69..524490eb0 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -1580,73 +1580,128 @@ class TestParallelTick: end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0] start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0] assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1" -async def _noop_coro(): - """Placeholder coroutine used by timeout-cancel tests.""" - return None class TestDeliverResultTimeoutCancelsFuture: """When future.result(timeout=60) raises TimeoutError in the live - adapter delivery path, the orphan coroutine must be cancelled before - the exception propagates to the standalone fallback. + adapter delivery path, _deliver_result must cancel the orphan + coroutine so it cannot duplicate-send after the standalone fallback. """ - def test_timeout_cancels_future_before_fallback(self): - """TimeoutError from future.result must trigger future.cancel().""" + def test_live_adapter_timeout_cancels_future_and_falls_back(self): + """End-to-end: live adapter hangs past the 60s budget, _deliver_result + patches the timeout down to a fast value, confirms future.cancel() fires, + and verifies the standalone fallback path still delivers.""" + from gateway.config import Platform from concurrent.futures import Future - future = MagicMock(spec=Future) - future.result.side_effect = TimeoutError("timed out") + # Live adapter whose send() coroutine never resolves within the budget + adapter = AsyncMock() + adapter.send.return_value = MagicMock(success=True) - def fake_run_coro(coro, loop): + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + # A real concurrent.futures.Future so .cancel() has real semantics, + # but we override .result() to raise TimeoutError exactly like the + # 60s wait firing in production. + captured_future = Future() + cancel_calls = [] + original_cancel = captured_future.cancel + + def tracking_cancel(): + cancel_calls.append(True) + return original_cancel() + + captured_future.cancel = tracking_cancel + captured_future.result = MagicMock(side_effect=TimeoutError("timed out")) + + def fake_run_coro(coro, _loop): coro.close() - return future + return captured_future - with patch( - "asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro - ): - with pytest.raises(TimeoutError): - import asyncio - f = asyncio.run_coroutine_threadsafe( - _noop_coro(), MagicMock() - ) - try: - f.result(timeout=60) - except TimeoutError: - f.cancel() - raise + job = { + "id": "timeout-job", + "deliver": "origin", + "origin": {"platform": "telegram", "chat_id": "123"}, + } - future.cancel.assert_called_once() + standalone_send = AsyncMock(return_value={"success": True}) + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \ + patch("tools.send_message_tool._send_to_platform", new=standalone_send): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + # 1. The orphan future was cancelled on timeout (the bug fix) + assert cancel_calls == [True], "future.cancel() must fire on TimeoutError" + # 2. The standalone fallback delivered — no double send, no silent drop + assert result is None, f"expected successful delivery, got error: {result!r}" + standalone_send.assert_awaited_once() class TestSendMediaTimeoutCancelsFuture: """Same orphan-coroutine guarantee for _send_media_via_adapter's - future.result(timeout=30) call. + future.result(timeout=30) call. If this times out mid-batch, the + in-flight coroutine must be cancelled before the next file is tried. """ - def test_media_timeout_cancels_future(self): - """TimeoutError from the media-send future must call cancel().""" + def test_media_send_timeout_cancels_future_and_continues(self): + """End-to-end: _send_media_via_adapter with a future whose .result() + raises TimeoutError. Assert cancel() fires and the loop proceeds + to the next file rather than hanging or crashing.""" from concurrent.futures import Future - future = MagicMock(spec=Future) - future.result.side_effect = TimeoutError("timed out") + adapter = MagicMock() + adapter.send_image_file = AsyncMock() + adapter.send_video = AsyncMock() - def fake_run_coro(coro, loop): + # First file: future that times out. Second file: future that resolves OK. + timeout_future = Future() + timeout_cancel_calls = [] + original_cancel = timeout_future.cancel + + def tracking_cancel(): + timeout_cancel_calls.append(True) + return original_cancel() + + timeout_future.cancel = tracking_cancel + timeout_future.result = MagicMock(side_effect=TimeoutError("timed out")) + + ok_future = Future() + ok_future.set_result(MagicMock(success=True)) + + futures_iter = iter([timeout_future, ok_future]) + + def fake_run_coro(coro, _loop): coro.close() - return future + return next(futures_iter) - with patch( - "asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro - ): - with pytest.raises(TimeoutError): - import asyncio - f = asyncio.run_coroutine_threadsafe( - _noop_coro(), MagicMock() - ) - try: - f.result(timeout=30) - except TimeoutError: - f.cancel() - raise + media_files = [ + ("/tmp/slow.png", False), # times out + ("/tmp/fast.mp4", False), # succeeds + ] - future.cancel.assert_called_once() + loop = MagicMock() + job = {"id": "media-timeout"} + + with patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + # Should not raise — the except Exception clause swallows the timeout + _send_media_via_adapter(adapter, "chat-1", media_files, None, loop, job) + + # 1. The timed-out future was cancelled (the bug fix) + assert timeout_cancel_calls == [True], "future.cancel() must fire on TimeoutError" + # 2. Second file still got dispatched — one timeout doesn't abort the batch + adapter.send_video.assert_called_once() + assert adapter.send_video.call_args[1]["video_path"] == "/tmp/fast.mp4"