mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(api-call): defer client.close() to owning worker thread on interrupt (#29507)
Layer-2 defense for the FD-recycling race: even with ``force_close_tcp_sockets`` reduced to shutdown-only, the followup ``client.close()`` in ``_close_openai_client`` still walks the httpx pool and closes sockets — and if called from a stranger thread (the interrupt-check loop, the stale-call detector) it has the same FD-recycling exposure that wrote a TLS record on top of ``kanban.db``. Stamp the request_client_holder with the owning thread's ident at ``_set_request_client`` time. In ``_close_request_client_once``: * Owning thread (the worker's ``finally``) → pop + ``client.close()`` via ``_close_request_openai_client``, exactly as before. * Stranger thread → ``_abort_request_openai_client`` (new): only ``shutdown(SHUT_RDWR)`` the pool sockets and log a deferred-close marker. The holder stays populated so the worker's eventual ``finally`` performs the real close from its own thread context, where the FD release races nothing. Applied symmetrically to both the non-streaming ``interruptible_api_call`` and the streaming variant — both routinely get hit by stranger-thread interrupts. The log field ``tcp_force_closed=N`` keeps its existing shape; the new abort path adds ``deferred_close=stranger_thread`` so production triage can distinguish the two close kinds.
This commit is contained in:
parent
e2a7d73a66
commit
30c22f1158
2 changed files with 92 additions and 6 deletions
|
|
@ -91,23 +91,55 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
|||
provider fallback.
|
||||
"""
|
||||
result = {"response": None, "error": None}
|
||||
request_client_holder = {"client": None}
|
||||
request_client_holder = {"client": None, "owner_tid": None}
|
||||
request_client_lock = threading.Lock()
|
||||
|
||||
def _set_request_client(client):
|
||||
with request_client_lock:
|
||||
request_client_holder["client"] = client
|
||||
# #29507: stamp the owning thread so a stranger-thread interrupt
|
||||
# only shuts the connection down rather than racing the worker
|
||||
# for FD ownership during ``client.close()``.
|
||||
request_client_holder["owner_tid"] = threading.get_ident()
|
||||
return client
|
||||
|
||||
def _take_request_client():
|
||||
with request_client_lock:
|
||||
client = request_client_holder.get("client")
|
||||
request_client_holder["client"] = None
|
||||
request_client_holder["owner_tid"] = None
|
||||
return client
|
||||
|
||||
def _close_request_client_once(reason: str) -> None:
|
||||
request_client = _take_request_client()
|
||||
if request_client is not None:
|
||||
# #29507: dispatch on the calling thread.
|
||||
#
|
||||
# When ``_call`` (the worker) reaches its ``finally`` it owns the
|
||||
# close and we pop + fully close as before. When a *stranger* thread
|
||||
# (the interrupt-check loop, the stale-call detector) drives the
|
||||
# close, only shut the sockets down so the worker's blocked
|
||||
# ``recv``/``send`` unwinds with an ``EPIPE`` / EOF — and let the
|
||||
# worker close ``client`` from its own thread on its way out. That
|
||||
# avoids the FD-recycling race where the kernel reassigned a
|
||||
# just-closed TLS socket FD to ``kanban.db``, and the still-live SSL
|
||||
# BIO on the worker thread then wrote a 24-byte TLS application-data
|
||||
# record into the SQLite header (#29507).
|
||||
with request_client_lock:
|
||||
request_client = request_client_holder.get("client")
|
||||
owner_tid = request_client_holder.get("owner_tid")
|
||||
stranger_thread = (
|
||||
request_client is not None
|
||||
and owner_tid is not None
|
||||
and owner_tid != threading.get_ident()
|
||||
)
|
||||
if not stranger_thread:
|
||||
# Owning thread (or no recorded owner) → pop and fully close.
|
||||
request_client_holder["client"] = None
|
||||
request_client_holder["owner_tid"] = None
|
||||
if request_client is None:
|
||||
return
|
||||
if stranger_thread:
|
||||
agent._abort_request_openai_client(request_client, reason=reason)
|
||||
else:
|
||||
agent._close_request_openai_client(request_client, reason=reason)
|
||||
|
||||
def _call():
|
||||
|
|
@ -1274,23 +1306,44 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
|||
return result["response"]
|
||||
|
||||
result = {"response": None, "error": None, "partial_tool_names": []}
|
||||
request_client_holder = {"client": None, "diag": None}
|
||||
request_client_holder = {"client": None, "diag": None, "owner_tid": None}
|
||||
request_client_lock = threading.Lock()
|
||||
|
||||
def _set_request_client(client):
|
||||
with request_client_lock:
|
||||
request_client_holder["client"] = client
|
||||
# See #29507 explanation in the non-streaming variant above.
|
||||
request_client_holder["owner_tid"] = threading.get_ident()
|
||||
return client
|
||||
|
||||
def _take_request_client():
|
||||
with request_client_lock:
|
||||
client = request_client_holder.get("client")
|
||||
request_client_holder["client"] = None
|
||||
request_client_holder["owner_tid"] = None
|
||||
return client
|
||||
|
||||
def _close_request_client_once(reason: str) -> None:
|
||||
request_client = _take_request_client()
|
||||
if request_client is not None:
|
||||
# See #29507 explanation in the non-streaming variant above. A
|
||||
# stranger thread (the interrupt-check / stale-stream detector loop)
|
||||
# only aborts sockets — never pops, never calls ``client.close()`` —
|
||||
# so the worker thread retains ownership of the FD release.
|
||||
with request_client_lock:
|
||||
request_client = request_client_holder.get("client")
|
||||
owner_tid = request_client_holder.get("owner_tid")
|
||||
stranger_thread = (
|
||||
request_client is not None
|
||||
and owner_tid is not None
|
||||
and owner_tid != threading.get_ident()
|
||||
)
|
||||
if not stranger_thread:
|
||||
request_client_holder["client"] = None
|
||||
request_client_holder["owner_tid"] = None
|
||||
if request_client is None:
|
||||
return
|
||||
if stranger_thread:
|
||||
agent._abort_request_openai_client(request_client, reason=reason)
|
||||
else:
|
||||
agent._close_request_openai_client(request_client, reason=reason)
|
||||
|
||||
first_delta_fired = {"done": False}
|
||||
|
|
|
|||
33
run_agent.py
33
run_agent.py
|
|
@ -2563,6 +2563,39 @@ class AIAgent:
|
|||
def _close_request_openai_client(self, client: Any, *, reason: str) -> None:
|
||||
self._close_openai_client(client, reason=reason, shared=False)
|
||||
|
||||
def _abort_request_openai_client(self, client: Any, *, reason: str) -> None:
|
||||
"""Cross-thread abort: shut sockets down without releasing FDs.
|
||||
|
||||
Companion to :meth:`_close_request_openai_client` for stranger-thread
|
||||
callers (interrupt-check loop, stale-call detector). Calling
|
||||
``client.close()`` from a thread that does not own the active httpx
|
||||
connection raced the still-live SSL BIO and corrupted unrelated file
|
||||
descriptors when the kernel recycled the just-freed TCP FD (#29507).
|
||||
|
||||
Here we only ``shutdown(SHUT_RDWR)`` the pool's sockets. That unblocks
|
||||
the owning worker thread's pending ``recv``/``send`` with an EOF or
|
||||
``EPIPE`` so it can unwind and close ``client`` from its own context
|
||||
— which is where the FD release belongs.
|
||||
"""
|
||||
if client is None:
|
||||
return
|
||||
try:
|
||||
shutdown_count = self._force_close_tcp_sockets(client)
|
||||
logger.info(
|
||||
"OpenAI client aborted (%s, shared=False, tcp_force_closed=%d, "
|
||||
"deferred_close=stranger_thread) %s",
|
||||
reason,
|
||||
shutdown_count,
|
||||
self._client_log_context(),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(
|
||||
"OpenAI client abort failed (%s, shared=False) %s error=%s",
|
||||
reason,
|
||||
self._client_log_context(),
|
||||
exc,
|
||||
)
|
||||
|
||||
def _run_codex_stream(self, api_kwargs: dict, client: Any = None, on_first_delta: callable = None):
|
||||
"""Forwarder — see ``agent.codex_runtime.run_codex_stream``."""
|
||||
from agent.codex_runtime import run_codex_stream
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue