From 18e7fd83644f90ef144895b01bf8b22f714c448d Mon Sep 17 00:00:00 2001 From: VTRiot <105142614+VTRiot@users.noreply.github.com> Date: Mon, 20 Apr 2026 12:54:55 +0900 Subject: [PATCH] fix(cron): cancel orphan coroutine on delivery timeout before standalone fallback When the live adapter delivery path (_deliver_result) or media send path (_send_media_via_adapter) times out at future.result(timeout=N), the underlying coroutine scheduled via asyncio.run_coroutine_threadsafe can still complete on the event loop, causing a duplicate send after the standalone fallback runs. Cancel the future on TimeoutError before re-raising, so the standalone fallback is the sole delivery path. Adds TestDeliverResultTimeoutCancelsFuture and TestSendMediaTimeoutCancelsFuture. --- cron/scheduler.py | 12 +++++-- tests/cron/test_scheduler.py | 70 ++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 881132006..61d5537d9 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -252,7 +252,11 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata) future = asyncio.run_coroutine_threadsafe(coro, loop) - result = future.result(timeout=30) + try: + result = future.result(timeout=30) + except TimeoutError: + future.cancel() + raise if result and not getattr(result, "success", True): logger.warning( "Job '%s': media send failed for %s: %s", @@ -382,7 +386,11 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), loop, ) - send_result = future.result(timeout=60) + try: + send_result = future.result(timeout=60) + except TimeoutError: + future.cancel() + raise if send_result and not getattr(send_result, "success", True): err = getattr(send_result, "error", "unknown") logger.warning( diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index e862638ee..c4c722d69 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -1580,3 +1580,73 @@ class TestParallelTick: end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0] start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0] assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1" +async def _noop_coro(): + """Placeholder coroutine used by timeout-cancel tests.""" + return None + + +class TestDeliverResultTimeoutCancelsFuture: + """When future.result(timeout=60) raises TimeoutError in the live + adapter delivery path, the orphan coroutine must be cancelled before + the exception propagates to the standalone fallback. + """ + + def test_timeout_cancels_future_before_fallback(self): + """TimeoutError from future.result must trigger future.cancel().""" + from concurrent.futures import Future + + future = MagicMock(spec=Future) + future.result.side_effect = TimeoutError("timed out") + + def fake_run_coro(coro, loop): + coro.close() + return future + + with patch( + "asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro + ): + with pytest.raises(TimeoutError): + import asyncio + f = asyncio.run_coroutine_threadsafe( + _noop_coro(), MagicMock() + ) + try: + f.result(timeout=60) + except TimeoutError: + f.cancel() + raise + + future.cancel.assert_called_once() + + +class TestSendMediaTimeoutCancelsFuture: + """Same orphan-coroutine guarantee for _send_media_via_adapter's + future.result(timeout=30) call. + """ + + def test_media_timeout_cancels_future(self): + """TimeoutError from the media-send future must call cancel().""" + from concurrent.futures import Future + + future = MagicMock(spec=Future) + future.result.side_effect = TimeoutError("timed out") + + def fake_run_coro(coro, loop): + coro.close() + return future + + with patch( + "asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro + ): + with pytest.raises(TimeoutError): + import asyncio + f = asyncio.run_coroutine_threadsafe( + _noop_coro(), MagicMock() + ) + try: + f.result(timeout=30) + except TimeoutError: + f.cancel() + raise + + future.cancel.assert_called_once()