From 30c22f1158c001cf35ce4a2cb5d2dc188fe43066 Mon Sep 17 00:00:00 2001 From: xxxigm Date: Thu, 21 May 2026 07:20:12 +0700 Subject: [PATCH] fix(api-call): defer client.close() to owning worker thread on interrupt (#29507) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Layer-2 defense for the FD-recycling race: even with ``force_close_tcp_sockets`` reduced to shutdown-only, the followup ``client.close()`` in ``_close_openai_client`` still walks the httpx pool and closes sockets — and if called from a stranger thread (the interrupt-check loop, the stale-call detector) it has the same FD-recycling exposure that wrote a TLS record on top of ``kanban.db``. Stamp the request_client_holder with the owning thread's ident at ``_set_request_client`` time. In ``_close_request_client_once``: * Owning thread (the worker's ``finally``) → pop + ``client.close()`` via ``_close_request_openai_client``, exactly as before. * Stranger thread → ``_abort_request_openai_client`` (new): only ``shutdown(SHUT_RDWR)`` the pool sockets and log a deferred-close marker. The holder stays populated so the worker's eventual ``finally`` performs the real close from its own thread context, where the FD release races nothing. Applied symmetrically to both the non-streaming ``interruptible_api_call`` and the streaming variant — both routinely get hit by stranger-thread interrupts. The log field ``tcp_force_closed=N`` keeps its existing shape; the new abort path adds ``deferred_close=stranger_thread`` so production triage can distinguish the two close kinds. --- agent/chat_completion_helpers.py | 65 +++++++++++++++++++++++++++++--- run_agent.py | 33 ++++++++++++++++ 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index f41e45cc820..59e7752a625 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -91,23 +91,55 @@ def interruptible_api_call(agent, api_kwargs: dict): provider fallback. """ result = {"response": None, "error": None} - request_client_holder = {"client": None} + request_client_holder = {"client": None, "owner_tid": None} request_client_lock = threading.Lock() def _set_request_client(client): with request_client_lock: request_client_holder["client"] = client + # #29507: stamp the owning thread so a stranger-thread interrupt + # only shuts the connection down rather than racing the worker + # for FD ownership during ``client.close()``. + request_client_holder["owner_tid"] = threading.get_ident() return client def _take_request_client(): with request_client_lock: client = request_client_holder.get("client") request_client_holder["client"] = None + request_client_holder["owner_tid"] = None return client def _close_request_client_once(reason: str) -> None: - request_client = _take_request_client() - if request_client is not None: + # #29507: dispatch on the calling thread. + # + # When ``_call`` (the worker) reaches its ``finally`` it owns the + # close and we pop + fully close as before. When a *stranger* thread + # (the interrupt-check loop, the stale-call detector) drives the + # close, only shut the sockets down so the worker's blocked + # ``recv``/``send`` unwinds with an ``EPIPE`` / EOF — and let the + # worker close ``client`` from its own thread on its way out. That + # avoids the FD-recycling race where the kernel reassigned a + # just-closed TLS socket FD to ``kanban.db``, and the still-live SSL + # BIO on the worker thread then wrote a 24-byte TLS application-data + # record into the SQLite header (#29507). + with request_client_lock: + request_client = request_client_holder.get("client") + owner_tid = request_client_holder.get("owner_tid") + stranger_thread = ( + request_client is not None + and owner_tid is not None + and owner_tid != threading.get_ident() + ) + if not stranger_thread: + # Owning thread (or no recorded owner) → pop and fully close. + request_client_holder["client"] = None + request_client_holder["owner_tid"] = None + if request_client is None: + return + if stranger_thread: + agent._abort_request_openai_client(request_client, reason=reason) + else: agent._close_request_openai_client(request_client, reason=reason) def _call(): @@ -1274,23 +1306,44 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= return result["response"] result = {"response": None, "error": None, "partial_tool_names": []} - request_client_holder = {"client": None, "diag": None} + request_client_holder = {"client": None, "diag": None, "owner_tid": None} request_client_lock = threading.Lock() def _set_request_client(client): with request_client_lock: request_client_holder["client"] = client + # See #29507 explanation in the non-streaming variant above. + request_client_holder["owner_tid"] = threading.get_ident() return client def _take_request_client(): with request_client_lock: client = request_client_holder.get("client") request_client_holder["client"] = None + request_client_holder["owner_tid"] = None return client def _close_request_client_once(reason: str) -> None: - request_client = _take_request_client() - if request_client is not None: + # See #29507 explanation in the non-streaming variant above. A + # stranger thread (the interrupt-check / stale-stream detector loop) + # only aborts sockets — never pops, never calls ``client.close()`` — + # so the worker thread retains ownership of the FD release. + with request_client_lock: + request_client = request_client_holder.get("client") + owner_tid = request_client_holder.get("owner_tid") + stranger_thread = ( + request_client is not None + and owner_tid is not None + and owner_tid != threading.get_ident() + ) + if not stranger_thread: + request_client_holder["client"] = None + request_client_holder["owner_tid"] = None + if request_client is None: + return + if stranger_thread: + agent._abort_request_openai_client(request_client, reason=reason) + else: agent._close_request_openai_client(request_client, reason=reason) first_delta_fired = {"done": False} diff --git a/run_agent.py b/run_agent.py index 5b89839b645..da121869f8d 100644 --- a/run_agent.py +++ b/run_agent.py @@ -2563,6 +2563,39 @@ class AIAgent: def _close_request_openai_client(self, client: Any, *, reason: str) -> None: self._close_openai_client(client, reason=reason, shared=False) + def _abort_request_openai_client(self, client: Any, *, reason: str) -> None: + """Cross-thread abort: shut sockets down without releasing FDs. + + Companion to :meth:`_close_request_openai_client` for stranger-thread + callers (interrupt-check loop, stale-call detector). Calling + ``client.close()`` from a thread that does not own the active httpx + connection raced the still-live SSL BIO and corrupted unrelated file + descriptors when the kernel recycled the just-freed TCP FD (#29507). + + Here we only ``shutdown(SHUT_RDWR)`` the pool's sockets. That unblocks + the owning worker thread's pending ``recv``/``send`` with an EOF or + ``EPIPE`` so it can unwind and close ``client`` from its own context + — which is where the FD release belongs. + """ + if client is None: + return + try: + shutdown_count = self._force_close_tcp_sockets(client) + logger.info( + "OpenAI client aborted (%s, shared=False, tcp_force_closed=%d, " + "deferred_close=stranger_thread) %s", + reason, + shutdown_count, + self._client_log_context(), + ) + except Exception as exc: + logger.debug( + "OpenAI client abort failed (%s, shared=False) %s error=%s", + reason, + self._client_log_context(), + exc, + ) + def _run_codex_stream(self, api_kwargs: dict, client: Any = None, on_first_delta: callable = None): """Forwarder — see ``agent.codex_runtime.run_codex_stream``.""" from agent.codex_runtime import run_codex_stream