From f7f30aaab94cc0c791a61849eb1a26633de129c9 Mon Sep 17 00:00:00 2001 From: Teknium Date: Wed, 25 Mar 2026 16:07:05 -0700 Subject: [PATCH] fix(streaming): detect and kill stale SSE connections Adds a wall-clock stale stream detector (HERMES_STREAM_STALE_TIMEOUT, default 90s) that force-closes the httpx client when no real chunks arrive, even if SSE keep-alive pings keep the socket alive. Works with the existing streaming retry loop to recover via fresh connection. Made-with: Cursor --- run_agent.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/run_agent.py b/run_agent.py index ef61658d9f4..17f1c018952 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3601,6 +3601,10 @@ class AIAgent: request_client_holder = {"client": None} first_delta_fired = {"done": False} deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback) + # Wall-clock timestamp of the last real streaming chunk. The outer + # poll loop uses this to detect stale connections that keep receiving + # SSE keep-alive pings but no actual data. + last_chunk_time = {"t": time.time()} def _fire_first_delta(): if not first_delta_fired["done"] and on_first_delta: @@ -3641,6 +3645,8 @@ class AIAgent: usage_obj = None for chunk in stream: + last_chunk_time["t"] = time.time() + if self._interrupt_requested: break @@ -3878,10 +3884,31 @@ class AIAgent: if request_client is not None: self._close_request_openai_client(request_client, reason="stream_request_complete") + _stream_stale_timeout = float(os.getenv("HERMES_STREAM_STALE_TIMEOUT", 90.0)) + t = threading.Thread(target=_call, daemon=True) t.start() while t.is_alive(): t.join(timeout=0.3) + + # Detect stale streams: connections kept alive by SSE pings + # but delivering no real chunks. Kill the client so the + # inner retry loop can start a fresh connection. + if time.time() - last_chunk_time["t"] > _stream_stale_timeout: + logger.warning( + "Stream stale for %.0fs — no chunks received. Killing connection.", + _stream_stale_timeout, + ) + try: + rc = request_client_holder.get("client") + if rc is not None: + self._close_request_openai_client(rc, reason="stale_stream_kill") + except Exception: + pass + # Reset the timer so we don't kill repeatedly while + # the inner thread processes the closure. + last_chunk_time["t"] = time.time() + if self._interrupt_requested: try: if self.api_mode == "anthropic_messages":