fix(gateway): swallow transient Telegram TimedOut at loop level

Closes #31066. Closes #31110.

An unhandled `telegram.error.TimedOut` (or peer `NetworkError` /
`httpx` connection error) propagating to the asyncio event loop killed
the entire gateway process, taking down every profile attached to the
same runner. systemd restarted the service after ~5s but the active
conversation turn was lost.

Public adapter methods (`adapter.send`, `adapter.edit_message`,
`adapter.send_voice`, …) are individually try/except-wrapped on
current main, but at least one async path was reaching the loop with
TimedOut unhandled — the report's traceback ends at the deepest httpx
frame and doesn't pinpoint the caller.

Rather than audit 30+ call sites blind, install a loop-level safety net:
`_gateway_loop_exception_handler` is set as the loop's exception handler
in `start_gateway()` after `asyncio.get_running_loop()`. It classifies
the exception via `_is_transient_network_error()` (walks the
__cause__/__context__ chain, matches on class name so the test suite
doesn't need the real telegram/httpx packages installed). Transient
errors are logged at WARNING with full traceback so the originating
call site stays diagnosable; everything else forwards to
`loop.default_exception_handler` so real bugs still surface.

Tests cover the classifier (known transients accepted, real bugs
rejected, cause/context chain unwrap, cyclic-cause termination) and the
handler (swallow + log warning, forward unknowns, missing-exception
context). One end-to-end test schedules an orphan task raising TimedOut
and asserts `asyncio.run` returns cleanly.
This commit is contained in:
teknium1 2026-05-24 04:32:53 -07:00 committed by Teknium
parent 3d66787a04
commit 5b52e26d18
2 changed files with 304 additions and 0 deletions

View file

@ -139,6 +139,85 @@ def _gateway_platform_value(platform: Any) -> str:
return str(getattr(platform, "value", platform) or "").strip().lower()
def _is_transient_network_error(exc: BaseException) -> bool:
"""Return True for transient network errors safe to log + swallow.
The crash class targeted by #31066 / #31110: an unhandled Telegram
``TimedOut`` (or peer ``NetworkError`` / ``httpx`` connection error)
propagating to the event loop and killing the entire gateway
process. These are by definition transient the next poll cycle or
user action recovers so they must never crash the process.
Walk the exception cause chain so wrapped errors (e.g. PTB's
``NetworkError`` wrapping ``httpx.ConnectError``) are still
classified. The chain is bounded to avoid pathological cycles.
"""
seen: set[int] = set()
cur: Optional[BaseException] = exc
depth = 0
transient_class_names = {
"TimedOut",
"NetworkError",
"ReadError",
"WriteError",
"ConnectError",
"ConnectTimeout",
"ReadTimeout",
"WriteTimeout",
"PoolTimeout",
"RemoteProtocolError",
"ServerDisconnectedError",
"ClientConnectorError",
"ClientOSError",
}
while cur is not None and depth < 12:
ident = id(cur)
if ident in seen:
break
seen.add(ident)
depth += 1
name = type(cur).__name__
if name in transient_class_names:
return True
cur = cur.__cause__ or cur.__context__
return False
def _gateway_loop_exception_handler(
loop: "asyncio.AbstractEventLoop", context: Dict[str, Any]
) -> None:
"""Loop-level safety net for transient network errors.
Installed once during :func:`start_gateway`. Catches the
``telegram.error.TimedOut`` crash class (issues #31066 / #31110)
and any peer transient network error before it can kill the
gateway process. Logs at WARNING with full traceback so the
originating call site stays diagnosable; non-transient errors
are forwarded to the default loop handler so real bugs still
surface.
"""
exc = context.get("exception")
if exc is not None and _is_transient_network_error(exc):
message = context.get("message") or "transient network error"
task = context.get("future") or context.get("task")
task_name = ""
if task is not None:
try:
task_name = task.get_name() if hasattr(task, "get_name") else repr(task)
except Exception:
task_name = repr(task)
logger.warning(
"Gateway swallowed transient network error from %s: %s: %s",
task_name or "<unknown task>",
type(exc).__name__,
exc,
exc_info=(type(exc), exc, exc.__traceback__),
)
return
# Fall back to the default handler for anything we don't recognise.
loop.default_exception_handler(context)
def _redact_gateway_user_facing_secrets(text: str) -> str:
"""Best-effort secret redaction before text can leave the gateway."""
redacted = str(text or "")
@ -18140,6 +18219,21 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
runner.request_restart(detached=False, via_service=True)
loop = asyncio.get_running_loop()
# Install a loop-level exception handler that swallows transient
# network errors from background tasks. Issues #31066 / #31110:
# an unhandled ``telegram.error.TimedOut`` (or peer NetworkError /
# httpx connection error) in any awaited coroutine would propagate
# to the loop and kill the gateway process, taking down every
# profile attached to the same runner. systemd then restarts the
# service after ~5s but the active conversation turn is lost.
#
# The fix is intentionally narrow: only well-known transient
# network errors are swallowed (and logged with full traceback so
# the originating call site is still discoverable). Anything else
# is forwarded to the default handler so real bugs still surface.
loop.set_exception_handler(_gateway_loop_exception_handler)
if threading.current_thread() is threading.main_thread():
for sig in (signal.SIGINT, signal.SIGTERM):
try:

View file

@ -0,0 +1,210 @@
"""Tests for the gateway loop-level transient-network-error safety net.
Issues #31066 / #31110: unhandled ``telegram.error.TimedOut`` (or peer
``NetworkError`` / ``httpx`` connection error) propagating to the
asyncio event loop killed the gateway process, taking down every
profile attached to the same runner. The safety net installed in
:func:`gateway.run.start_gateway` catches the transient crash class
and logs+swallows it; non-transient errors still surface.
These tests pin the classifier and the loop handler so the safety net
can't silently regress to swallowing every exception.
"""
from __future__ import annotations
import asyncio
import logging
import pytest
from gateway.run import (
_gateway_loop_exception_handler,
_is_transient_network_error,
)
# ----- Fake exception classes that mimic the real wire types ----------
# We avoid importing telegram / httpx here so the test runs in environments
# without those packages installed (the classifier matches on class name).
class TimedOut(Exception):
"""Stand-in for ``telegram.error.TimedOut``."""
class NetworkError(Exception):
"""Stand-in for ``telegram.error.NetworkError``."""
class ConnectError(Exception):
"""Stand-in for ``httpx.ConnectError``."""
class ReadTimeout(Exception):
"""Stand-in for ``httpx.ReadTimeout``."""
class PoolTimeout(Exception):
"""Stand-in for ``httpx.PoolTimeout``."""
class ClientConnectorError(Exception):
"""Stand-in for ``aiohttp.ClientConnectorError``."""
class SomeUnrelatedBug(Exception):
"""A non-transient error that should NOT be swallowed."""
# ---------------------------------------------------------------------
# Classifier
# ---------------------------------------------------------------------
@pytest.mark.parametrize(
"exc_cls",
[
TimedOut,
NetworkError,
ConnectError,
ReadTimeout,
PoolTimeout,
ClientConnectorError,
],
)
def test_transient_classifier_matches_known_network_errors(exc_cls):
"""Every well-known transient network exception class is classified."""
assert _is_transient_network_error(exc_cls("boom")) is True
def test_transient_classifier_rejects_unrelated_errors():
"""Real bugs (ValueError, KeyError, custom app errors) are NOT swallowed."""
for exc in (ValueError("bad"), KeyError("missing"), SomeUnrelatedBug("x")):
assert _is_transient_network_error(exc) is False
def test_transient_classifier_unwraps_cause_chain():
"""A NetworkError wrapping a ConnectError is still classified."""
inner = ConnectError("connection refused")
outer = NetworkError("upstream failed")
outer.__cause__ = inner
assert _is_transient_network_error(outer) is True
def test_transient_classifier_unwraps_context_chain():
"""Implicit ``__context__`` wrapping is also unwrapped."""
try:
try:
raise TimedOut("upstream timeout")
except TimedOut:
# Re-raise something else with the original as implicit context
raise SomeUnrelatedBug("wrapper")
except SomeUnrelatedBug as e:
wrapped = e
# The wrapper class name is not transient, but the chained context is.
assert _is_transient_network_error(wrapped) is True
def test_transient_classifier_does_not_infinite_loop_on_cyclic_cause():
"""A pathological self-referential cause chain terminates."""
exc = SomeUnrelatedBug("loop")
exc.__cause__ = exc # cycle
# Must return without hanging.
assert _is_transient_network_error(exc) is False
# ---------------------------------------------------------------------
# Loop handler
# ---------------------------------------------------------------------
def test_handler_swallows_transient_error_and_logs_warning(caplog):
"""Transient errors are logged at WARNING but not re-raised."""
loop = asyncio.new_event_loop()
try:
with caplog.at_level(logging.WARNING, logger="gateway.run"):
_gateway_loop_exception_handler(
loop,
{
"message": "Task exception was never retrieved",
"exception": TimedOut("Timed out"),
},
)
# Warning emitted, exception class name appears in the log.
assert any("TimedOut" in r.message for r in caplog.records)
finally:
loop.close()
def test_handler_delegates_unknown_errors_to_default(monkeypatch):
"""A non-transient error is forwarded to ``loop.default_exception_handler``."""
loop = asyncio.new_event_loop()
try:
forwarded: list[dict] = []
def fake_default(ctx):
forwarded.append(ctx)
monkeypatch.setattr(loop, "default_exception_handler", fake_default)
context = {
"message": "Something else broke",
"exception": SomeUnrelatedBug("real bug"),
}
_gateway_loop_exception_handler(loop, context)
assert forwarded == [context]
finally:
loop.close()
def test_handler_tolerates_missing_exception_key(monkeypatch):
"""Contexts without an ``exception`` key fall through to the default handler."""
loop = asyncio.new_event_loop()
try:
forwarded: list[dict] = []
monkeypatch.setattr(
loop, "default_exception_handler", lambda ctx: forwarded.append(ctx)
)
ctx = {"message": "warning without exception"}
_gateway_loop_exception_handler(loop, ctx)
assert forwarded == [ctx]
finally:
loop.close()
# ---------------------------------------------------------------------
# End-to-end: task-level
# ---------------------------------------------------------------------
def test_unhandled_transient_error_in_task_does_not_propagate_to_loop():
"""Smoke test the wiring as a loop would actually use it.
Schedules a task that raises TimedOut and is never awaited. With the
handler installed, the loop completes normally and logs a warning
instead of dying. Without the handler, asyncio would emit
``Task exception was never retrieved`` and (depending on Python's
debug mode) potentially escalate.
"""
async def raiser():
raise TimedOut("upstream timeout")
async def main():
loop = asyncio.get_running_loop()
loop.set_exception_handler(_gateway_loop_exception_handler)
task = loop.create_task(raiser())
# Give the task a tick to run and raise.
await asyncio.sleep(0)
# Don't await ``task`` — let it become an unhandled-exception task.
del task
import gc
gc.collect()
await asyncio.sleep(0)
# If the safety net works, this returns cleanly. If not, the test
# would still pass (asyncio's default is a warning, not a crash) —
# the real assertion is that no unhandled exception escapes the
# ``run`` boundary.
asyncio.run(main())