diff --git a/gateway/run.py b/gateway/run.py index 33b9e6e4435..7bf16ffc995 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -139,6 +139,85 @@ def _gateway_platform_value(platform: Any) -> str: return str(getattr(platform, "value", platform) or "").strip().lower() +def _is_transient_network_error(exc: BaseException) -> bool: + """Return True for transient network errors safe to log + swallow. + + The crash class targeted by #31066 / #31110: an unhandled Telegram + ``TimedOut`` (or peer ``NetworkError`` / ``httpx`` connection error) + propagating to the event loop and killing the entire gateway + process. These are by definition transient — the next poll cycle or + user action recovers — so they must never crash the process. + + Walk the exception cause chain so wrapped errors (e.g. PTB's + ``NetworkError`` wrapping ``httpx.ConnectError``) are still + classified. The chain is bounded to avoid pathological cycles. + """ + seen: set[int] = set() + cur: Optional[BaseException] = exc + depth = 0 + transient_class_names = { + "TimedOut", + "NetworkError", + "ReadError", + "WriteError", + "ConnectError", + "ConnectTimeout", + "ReadTimeout", + "WriteTimeout", + "PoolTimeout", + "RemoteProtocolError", + "ServerDisconnectedError", + "ClientConnectorError", + "ClientOSError", + } + while cur is not None and depth < 12: + ident = id(cur) + if ident in seen: + break + seen.add(ident) + depth += 1 + name = type(cur).__name__ + if name in transient_class_names: + return True + cur = cur.__cause__ or cur.__context__ + return False + + +def _gateway_loop_exception_handler( + loop: "asyncio.AbstractEventLoop", context: Dict[str, Any] +) -> None: + """Loop-level safety net for transient network errors. + + Installed once during :func:`start_gateway`. Catches the + ``telegram.error.TimedOut`` crash class (issues #31066 / #31110) + and any peer transient network error before it can kill the + gateway process. Logs at WARNING with full traceback so the + originating call site stays diagnosable; non-transient errors + are forwarded to the default loop handler so real bugs still + surface. + """ + exc = context.get("exception") + if exc is not None and _is_transient_network_error(exc): + message = context.get("message") or "transient network error" + task = context.get("future") or context.get("task") + task_name = "" + if task is not None: + try: + task_name = task.get_name() if hasattr(task, "get_name") else repr(task) + except Exception: + task_name = repr(task) + logger.warning( + "Gateway swallowed transient network error from %s: %s: %s", + task_name or "", + type(exc).__name__, + exc, + exc_info=(type(exc), exc, exc.__traceback__), + ) + return + # Fall back to the default handler for anything we don't recognise. + loop.default_exception_handler(context) + + def _redact_gateway_user_facing_secrets(text: str) -> str: """Best-effort secret redaction before text can leave the gateway.""" redacted = str(text or "") @@ -18140,6 +18219,21 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = runner.request_restart(detached=False, via_service=True) loop = asyncio.get_running_loop() + + # Install a loop-level exception handler that swallows transient + # network errors from background tasks. Issues #31066 / #31110: + # an unhandled ``telegram.error.TimedOut`` (or peer NetworkError / + # httpx connection error) in any awaited coroutine would propagate + # to the loop and kill the gateway process, taking down every + # profile attached to the same runner. systemd then restarts the + # service after ~5s but the active conversation turn is lost. + # + # The fix is intentionally narrow: only well-known transient + # network errors are swallowed (and logged with full traceback so + # the originating call site is still discoverable). Anything else + # is forwarded to the default handler so real bugs still surface. + loop.set_exception_handler(_gateway_loop_exception_handler) + if threading.current_thread() is threading.main_thread(): for sig in (signal.SIGINT, signal.SIGTERM): try: diff --git a/tests/gateway/test_loop_exception_handler.py b/tests/gateway/test_loop_exception_handler.py new file mode 100644 index 00000000000..66ba4d94304 --- /dev/null +++ b/tests/gateway/test_loop_exception_handler.py @@ -0,0 +1,210 @@ +"""Tests for the gateway loop-level transient-network-error safety net. + +Issues #31066 / #31110: unhandled ``telegram.error.TimedOut`` (or peer +``NetworkError`` / ``httpx`` connection error) propagating to the +asyncio event loop killed the gateway process, taking down every +profile attached to the same runner. The safety net installed in +:func:`gateway.run.start_gateway` catches the transient crash class +and logs+swallows it; non-transient errors still surface. + +These tests pin the classifier and the loop handler so the safety net +can't silently regress to swallowing every exception. +""" + +from __future__ import annotations + +import asyncio +import logging + +import pytest + +from gateway.run import ( + _gateway_loop_exception_handler, + _is_transient_network_error, +) + + +# ----- Fake exception classes that mimic the real wire types ---------- +# We avoid importing telegram / httpx here so the test runs in environments +# without those packages installed (the classifier matches on class name). + +class TimedOut(Exception): + """Stand-in for ``telegram.error.TimedOut``.""" + + +class NetworkError(Exception): + """Stand-in for ``telegram.error.NetworkError``.""" + + +class ConnectError(Exception): + """Stand-in for ``httpx.ConnectError``.""" + + +class ReadTimeout(Exception): + """Stand-in for ``httpx.ReadTimeout``.""" + + +class PoolTimeout(Exception): + """Stand-in for ``httpx.PoolTimeout``.""" + + +class ClientConnectorError(Exception): + """Stand-in for ``aiohttp.ClientConnectorError``.""" + + +class SomeUnrelatedBug(Exception): + """A non-transient error that should NOT be swallowed.""" + + +# --------------------------------------------------------------------- +# Classifier +# --------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "exc_cls", + [ + TimedOut, + NetworkError, + ConnectError, + ReadTimeout, + PoolTimeout, + ClientConnectorError, + ], +) +def test_transient_classifier_matches_known_network_errors(exc_cls): + """Every well-known transient network exception class is classified.""" + assert _is_transient_network_error(exc_cls("boom")) is True + + +def test_transient_classifier_rejects_unrelated_errors(): + """Real bugs (ValueError, KeyError, custom app errors) are NOT swallowed.""" + for exc in (ValueError("bad"), KeyError("missing"), SomeUnrelatedBug("x")): + assert _is_transient_network_error(exc) is False + + +def test_transient_classifier_unwraps_cause_chain(): + """A NetworkError wrapping a ConnectError is still classified.""" + inner = ConnectError("connection refused") + outer = NetworkError("upstream failed") + outer.__cause__ = inner + assert _is_transient_network_error(outer) is True + + +def test_transient_classifier_unwraps_context_chain(): + """Implicit ``__context__`` wrapping is also unwrapped.""" + try: + try: + raise TimedOut("upstream timeout") + except TimedOut: + # Re-raise something else with the original as implicit context + raise SomeUnrelatedBug("wrapper") + except SomeUnrelatedBug as e: + wrapped = e + # The wrapper class name is not transient, but the chained context is. + assert _is_transient_network_error(wrapped) is True + + +def test_transient_classifier_does_not_infinite_loop_on_cyclic_cause(): + """A pathological self-referential cause chain terminates.""" + exc = SomeUnrelatedBug("loop") + exc.__cause__ = exc # cycle + # Must return without hanging. + assert _is_transient_network_error(exc) is False + + +# --------------------------------------------------------------------- +# Loop handler +# --------------------------------------------------------------------- + + +def test_handler_swallows_transient_error_and_logs_warning(caplog): + """Transient errors are logged at WARNING but not re-raised.""" + loop = asyncio.new_event_loop() + try: + with caplog.at_level(logging.WARNING, logger="gateway.run"): + _gateway_loop_exception_handler( + loop, + { + "message": "Task exception was never retrieved", + "exception": TimedOut("Timed out"), + }, + ) + # Warning emitted, exception class name appears in the log. + assert any("TimedOut" in r.message for r in caplog.records) + finally: + loop.close() + + +def test_handler_delegates_unknown_errors_to_default(monkeypatch): + """A non-transient error is forwarded to ``loop.default_exception_handler``.""" + loop = asyncio.new_event_loop() + try: + forwarded: list[dict] = [] + + def fake_default(ctx): + forwarded.append(ctx) + + monkeypatch.setattr(loop, "default_exception_handler", fake_default) + + context = { + "message": "Something else broke", + "exception": SomeUnrelatedBug("real bug"), + } + _gateway_loop_exception_handler(loop, context) + assert forwarded == [context] + finally: + loop.close() + + +def test_handler_tolerates_missing_exception_key(monkeypatch): + """Contexts without an ``exception`` key fall through to the default handler.""" + loop = asyncio.new_event_loop() + try: + forwarded: list[dict] = [] + monkeypatch.setattr( + loop, "default_exception_handler", lambda ctx: forwarded.append(ctx) + ) + ctx = {"message": "warning without exception"} + _gateway_loop_exception_handler(loop, ctx) + assert forwarded == [ctx] + finally: + loop.close() + + +# --------------------------------------------------------------------- +# End-to-end: task-level +# --------------------------------------------------------------------- + + +def test_unhandled_transient_error_in_task_does_not_propagate_to_loop(): + """Smoke test the wiring as a loop would actually use it. + + Schedules a task that raises TimedOut and is never awaited. With the + handler installed, the loop completes normally and logs a warning + instead of dying. Without the handler, asyncio would emit + ``Task exception was never retrieved`` and (depending on Python's + debug mode) potentially escalate. + """ + + async def raiser(): + raise TimedOut("upstream timeout") + + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(_gateway_loop_exception_handler) + task = loop.create_task(raiser()) + # Give the task a tick to run and raise. + await asyncio.sleep(0) + # Don't await ``task`` — let it become an unhandled-exception task. + del task + import gc + + gc.collect() + await asyncio.sleep(0) + + # If the safety net works, this returns cleanly. If not, the test + # would still pass (asyncio's default is a warning, not a crash) — + # the real assertion is that no unhandled exception escapes the + # ``run`` boundary. + asyncio.run(main())