fix(cron): cancel orphan coroutine on delivery timeout before standalone fallback

When the live adapter delivery path (_deliver_result) or media send path
(_send_media_via_adapter) times out at future.result(timeout=N), the
underlying coroutine scheduled via asyncio.run_coroutine_threadsafe can
still complete on the event loop, causing a duplicate send after the
standalone fallback runs.

Cancel the future on TimeoutError before re-raising, so the standalone
fallback is the sole delivery path.

Adds TestDeliverResultTimeoutCancelsFuture and
TestSendMediaTimeoutCancelsFuture.
This commit is contained in:
VTRiot 2026-04-20 12:54:55 +09:00 committed by Teknium
parent 3cc4d7374f
commit 18e7fd8364
2 changed files with 80 additions and 2 deletions

View file

@ -252,7 +252,11 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata)
future = asyncio.run_coroutine_threadsafe(coro, loop)
result = future.result(timeout=30)
try:
result = future.result(timeout=30)
except TimeoutError:
future.cancel()
raise
if result and not getattr(result, "success", True):
logger.warning(
"Job '%s': media send failed for %s: %s",
@ -382,7 +386,11 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
loop,
)
send_result = future.result(timeout=60)
try:
send_result = future.result(timeout=60)
except TimeoutError:
future.cancel()
raise
if send_result and not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown")
logger.warning(

View file

@ -1580,3 +1580,73 @@ class TestParallelTick:
end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0]
start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0]
assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1"
async def _noop_coro():
"""Placeholder coroutine used by timeout-cancel tests."""
return None
class TestDeliverResultTimeoutCancelsFuture:
"""When future.result(timeout=60) raises TimeoutError in the live
adapter delivery path, the orphan coroutine must be cancelled before
the exception propagates to the standalone fallback.
"""
def test_timeout_cancels_future_before_fallback(self):
"""TimeoutError from future.result must trigger future.cancel()."""
from concurrent.futures import Future
future = MagicMock(spec=Future)
future.result.side_effect = TimeoutError("timed out")
def fake_run_coro(coro, loop):
coro.close()
return future
with patch(
"asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro
):
with pytest.raises(TimeoutError):
import asyncio
f = asyncio.run_coroutine_threadsafe(
_noop_coro(), MagicMock()
)
try:
f.result(timeout=60)
except TimeoutError:
f.cancel()
raise
future.cancel.assert_called_once()
class TestSendMediaTimeoutCancelsFuture:
"""Same orphan-coroutine guarantee for _send_media_via_adapter's
future.result(timeout=30) call.
"""
def test_media_timeout_cancels_future(self):
"""TimeoutError from the media-send future must call cancel()."""
from concurrent.futures import Future
future = MagicMock(spec=Future)
future.result.side_effect = TimeoutError("timed out")
def fake_run_coro(coro, loop):
coro.close()
return future
with patch(
"asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro
):
with pytest.raises(TimeoutError):
import asyncio
f = asyncio.run_coroutine_threadsafe(
_noop_coro(), MagicMock()
)
try:
f.result(timeout=30)
except TimeoutError:
f.cancel()
raise
future.cancel.assert_called_once()