From 5f309ae685d08a2d23eaa992a5bca1f70f52486a Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 16 May 2026 17:59:32 -0700 Subject: [PATCH] refactor(run_agent): extract OpenAI proxy, safe stdio, IterationBudget MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three small extractions into focused modules: * agent/process_bootstrap.py — \_OpenAIProxy (lazy openai.OpenAI import), \_SafeWriter (broken-pipe-resistant stdio wrapper), \_install_safe_stdio, \_get_proxy_from_env, \_get_proxy_for_base_url. All process / IO bootstrap. * agent/iteration_budget.py — IterationBudget class (thread-safe consume/ refund counter shared by parent agent and subagents). run_agent re-exports every name so existing test patches like patch('run_agent.OpenAI', ...) and 'from run_agent import IterationBudget' keep working unchanged. Verified the patch-rebinding contract for OpenAI explicitly. tests/run_agent/ + tests/agent/test_gemini_fast_fallback.py: 1347 passed, 3 skipped. run_agent.py: 15427 -> 15261 lines (-166). --- agent/iteration_budget.py | 62 +++++++++++++ agent/process_bootstrap.py | 167 ++++++++++++++++++++++++++++++++++ run_agent.py | 179 +++---------------------------------- 3 files changed, 241 insertions(+), 167 deletions(-) create mode 100644 agent/iteration_budget.py create mode 100644 agent/process_bootstrap.py diff --git a/agent/iteration_budget.py b/agent/iteration_budget.py new file mode 100644 index 00000000000..213b97c0226 --- /dev/null +++ b/agent/iteration_budget.py @@ -0,0 +1,62 @@ +"""Per-agent iteration budget — thread-safe consume/refund counter. + +Extracted from ``run_agent.py``. Each ``AIAgent`` instance (parent or +subagent) holds an :class:`IterationBudget`; the parent's cap comes from +``max_iterations`` (default 90), each subagent's cap comes from +``delegation.max_iterations`` (default 50). + +``run_agent`` re-exports ``IterationBudget`` so existing +``from run_agent import IterationBudget`` imports keep working unchanged. +""" + +from __future__ import annotations + +import threading + + +class IterationBudget: + """Thread-safe iteration counter for an agent. + + Each agent (parent or subagent) gets its own ``IterationBudget``. + The parent's budget is capped at ``max_iterations`` (default 90). + Each subagent gets an independent budget capped at + ``delegation.max_iterations`` (default 50) — this means total + iterations across parent + subagents can exceed the parent's cap. + Users control the per-subagent limit via ``delegation.max_iterations`` + in config.yaml. + + ``execute_code`` (programmatic tool calling) iterations are refunded via + :meth:`refund` so they don't eat into the budget. + """ + + def __init__(self, max_total: int): + self.max_total = max_total + self._used = 0 + self._lock = threading.Lock() + + def consume(self) -> bool: + """Try to consume one iteration. Returns True if allowed.""" + with self._lock: + if self._used >= self.max_total: + return False + self._used += 1 + return True + + def refund(self) -> None: + """Give back one iteration (e.g. for execute_code turns).""" + with self._lock: + if self._used > 0: + self._used -= 1 + + @property + def used(self) -> int: + with self._lock: + return self._used + + @property + def remaining(self) -> int: + with self._lock: + return max(0, self.max_total - self._used) + + +__all__ = ["IterationBudget"] diff --git a/agent/process_bootstrap.py b/agent/process_bootstrap.py new file mode 100644 index 00000000000..fdd9053f5d8 --- /dev/null +++ b/agent/process_bootstrap.py @@ -0,0 +1,167 @@ +"""Process-level bootstrap helpers for ``run_agent``. + +Three concerns, all tied to ``AIAgent`` boot-time / runtime IO setup: + +1. **Lazy OpenAI SDK import** — ``_load_openai_cls`` + ``_OpenAIProxy`` + defer the 240ms-ish ``from openai import OpenAI`` cost until first use, + while preserving ``isinstance(client, OpenAI)`` checks and + ``patch("run_agent.OpenAI", ...)`` test patterns. + +2. **Crash-resistant stdio** — ``_SafeWriter`` wraps stdout/stderr so + ``OSError: Input/output error`` from broken pipes (systemd, Docker, + thread teardown races) cannot crash the agent. ``_install_safe_stdio`` + applies the wrapper. + +3. **HTTP proxy resolution** — ``_get_proxy_from_env`` reads + ``HTTPS_PROXY`` / ``HTTP_PROXY`` / ``ALL_PROXY``; + ``_get_proxy_for_base_url`` respects ``NO_PROXY`` for the given base URL. + +``run_agent`` re-exports every name so existing +``from run_agent import _get_proxy_from_env`` imports keep working +unchanged. +""" + +from __future__ import annotations + +import os +import sys +import urllib.request +from typing import Optional + +from utils import base_url_hostname, normalize_proxy_url + + +# Cached at module level so we only pay the OpenAI SDK import cost once +# per process (after the first lazy load). +_OPENAI_CLS_CACHE = None + + +def _load_openai_cls() -> type: + """Import and cache ``openai.OpenAI``.""" + global _OPENAI_CLS_CACHE + if _OPENAI_CLS_CACHE is None: + from openai import OpenAI as _cls + _OPENAI_CLS_CACHE = _cls + return _OPENAI_CLS_CACHE + + +class _OpenAIProxy: + """Module-level proxy that looks like ``openai.OpenAI`` but imports lazily.""" + + __slots__ = () + + def __call__(self, *args, **kwargs): + return _load_openai_cls()(*args, **kwargs) + + def __instancecheck__(self, obj): + return isinstance(obj, _load_openai_cls()) + + def __repr__(self): + return "" + + +class _SafeWriter: + """Transparent stdio wrapper that catches OSError/ValueError from broken pipes. + + When hermes-agent runs as a systemd service, Docker container, or headless + daemon, the stdout/stderr pipe can become unavailable (idle timeout, buffer + exhaustion, socket reset). Any print() call then raises + ``OSError: [Errno 5] Input/output error``, which can crash agent setup or + run_conversation() — especially via double-fault when an except handler + also tries to print. + + Additionally, when subagents run in ThreadPoolExecutor threads, the shared + stdout handle can close between thread teardown and cleanup, raising + ``ValueError: I/O operation on closed file`` instead of OSError. + + This wrapper delegates all writes to the underlying stream and silently + catches both OSError and ValueError. It is transparent when the wrapped + stream is healthy. + """ + + __slots__ = ("_inner",) + + def __init__(self, inner): + object.__setattr__(self, "_inner", inner) + + def write(self, data): + try: + return self._inner.write(data) + except (OSError, ValueError): + return len(data) if isinstance(data, str) else 0 + + def flush(self): + try: + self._inner.flush() + except (OSError, ValueError): + pass + + def fileno(self): + return self._inner.fileno() + + def isatty(self): + try: + return self._inner.isatty() + except (OSError, ValueError): + return False + + def __getattr__(self, name): + return getattr(self._inner, name) + + +def _get_proxy_from_env() -> Optional[str]: + """Read proxy URL from environment variables. + + Checks HTTPS_PROXY, HTTP_PROXY, ALL_PROXY (and lowercase variants) in order. + Returns the first valid proxy URL found, or None if no proxy is configured. + """ + for key in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY", + "https_proxy", "http_proxy", "all_proxy"): + value = os.environ.get(key, "").strip() + if value: + return normalize_proxy_url(value) + return None + + +def _get_proxy_for_base_url(base_url: Optional[str]) -> Optional[str]: + """Return an env-configured proxy unless NO_PROXY excludes this base URL.""" + proxy = _get_proxy_from_env() + if not proxy or not base_url: + return proxy + + host = base_url_hostname(base_url) + if not host: + return proxy + + try: + if urllib.request.proxy_bypass_environment(host): + return None + except Exception: + pass + + return proxy + + +def _install_safe_stdio() -> None: + """Wrap stdout/stderr so best-effort console output cannot crash the agent.""" + for stream_name in ("stdout", "stderr"): + stream = getattr(sys, stream_name, None) + if stream is not None and not isinstance(stream, _SafeWriter): + setattr(sys, stream_name, _SafeWriter(stream)) + + +# Module-level proxy instance — drops in for ``openai.OpenAI``. Imported as +# ``from agent.process_bootstrap import OpenAI`` (or re-exported via +# ``run_agent`` for legacy tests). +OpenAI = _OpenAIProxy() + + +__all__ = [ + "OpenAI", + "_OpenAIProxy", + "_load_openai_cls", + "_SafeWriter", + "_install_safe_stdio", + "_get_proxy_from_env", + "_get_proxy_for_base_url", +] diff --git a/run_agent.py b/run_agent.py index eed7550c468..22848b2f20e 100644 --- a/run_agent.py +++ b/run_agent.py @@ -70,38 +70,20 @@ from pathlib import Path from hermes_constants import get_hermes_home - -_OPENAI_CLS_CACHE: Optional[type] = None +# OpenAI lazy proxy + safe stdio + proxy URL helpers — see agent/process_bootstrap.py. +# `OpenAI` is re-exported here so `patch("run_agent.OpenAI", ...)` in tests works. +from agent.process_bootstrap import ( + OpenAI, + _OpenAIProxy, + _load_openai_cls, + _SafeWriter, + _install_safe_stdio, + _get_proxy_from_env, + _get_proxy_for_base_url, +) +from agent.iteration_budget import IterationBudget -def _load_openai_cls() -> type: - """Import and cache ``openai.OpenAI``.""" - global _OPENAI_CLS_CACHE - if _OPENAI_CLS_CACHE is None: - from openai import OpenAI as _cls - _OPENAI_CLS_CACHE = _cls - return _OPENAI_CLS_CACHE - - -class _OpenAIProxy: - """Module-level proxy that looks like ``openai.OpenAI`` but imports lazily.""" - - __slots__ = () - - def __call__(self, *args, **kwargs): - return _load_openai_cls()(*args, **kwargs) - - def __instancecheck__(self, obj): - return isinstance(obj, _load_openai_cls()) - - def __repr__(self): - return "" - - -OpenAI = _OpenAIProxy() - -# Load .env from ~/.hermes/.env first, then project root as dev fallback. -# User-managed env files should override stale shell exports on restart. from hermes_cli.env_loader import load_hermes_dotenv from hermes_cli.timeouts import ( get_provider_request_timeout, @@ -224,143 +206,6 @@ from hermes_cli.config import cfg_get -class _SafeWriter: - """Transparent stdio wrapper that catches OSError/ValueError from broken pipes. - - When hermes-agent runs as a systemd service, Docker container, or headless - daemon, the stdout/stderr pipe can become unavailable (idle timeout, buffer - exhaustion, socket reset). Any print() call then raises - ``OSError: [Errno 5] Input/output error``, which can crash agent setup or - run_conversation() — especially via double-fault when an except handler - also tries to print. - - Additionally, when subagents run in ThreadPoolExecutor threads, the shared - stdout handle can close between thread teardown and cleanup, raising - ``ValueError: I/O operation on closed file`` instead of OSError. - - This wrapper delegates all writes to the underlying stream and silently - catches both OSError and ValueError. It is transparent when the wrapped - stream is healthy. - """ - - __slots__ = ("_inner",) - - def __init__(self, inner): - object.__setattr__(self, "_inner", inner) - - def write(self, data): - try: - return self._inner.write(data) - except (OSError, ValueError): - return len(data) if isinstance(data, str) else 0 - - def flush(self): - try: - self._inner.flush() - except (OSError, ValueError): - pass - - def fileno(self): - return self._inner.fileno() - - def isatty(self): - try: - return self._inner.isatty() - except (OSError, ValueError): - return False - - def __getattr__(self, name): - return getattr(self._inner, name) - - -def _get_proxy_from_env() -> Optional[str]: - """Read proxy URL from environment variables. - - Checks HTTPS_PROXY, HTTP_PROXY, ALL_PROXY (and lowercase variants) in order. - Returns the first valid proxy URL found, or None if no proxy is configured. - """ - for key in ("HTTPS_PROXY", "HTTP_PROXY", "ALL_PROXY", - "https_proxy", "http_proxy", "all_proxy"): - value = os.environ.get(key, "").strip() - if value: - return normalize_proxy_url(value) - return None - - -def _get_proxy_for_base_url(base_url: Optional[str]) -> Optional[str]: - """Return an env-configured proxy unless NO_PROXY excludes this base URL.""" - proxy = _get_proxy_from_env() - if not proxy or not base_url: - return proxy - - host = base_url_hostname(base_url) - if not host: - return proxy - - try: - if urllib.request.proxy_bypass_environment(host): - return None - except Exception: - pass - - return proxy - - -def _install_safe_stdio() -> None: - """Wrap stdout/stderr so best-effort console output cannot crash the agent.""" - for stream_name in ("stdout", "stderr"): - stream = getattr(sys, stream_name, None) - if stream is not None and not isinstance(stream, _SafeWriter): - setattr(sys, stream_name, _SafeWriter(stream)) - - -class IterationBudget: - """Thread-safe iteration counter for an agent. - - Each agent (parent or subagent) gets its own ``IterationBudget``. - The parent's budget is capped at ``max_iterations`` (default 90). - Each subagent gets an independent budget capped at - ``delegation.max_iterations`` (default 50) — this means total - iterations across parent + subagents can exceed the parent's cap. - Users control the per-subagent limit via ``delegation.max_iterations`` - in config.yaml. - - ``execute_code`` (programmatic tool calling) iterations are refunded via - :meth:`refund` so they don't eat into the budget. - """ - - def __init__(self, max_total: int): - self.max_total = max_total - self._used = 0 - self._lock = threading.Lock() - - def consume(self) -> bool: - """Try to consume one iteration. Returns True if allowed.""" - with self._lock: - if self._used >= self.max_total: - return False - self._used += 1 - return True - - def refund(self) -> None: - """Give back one iteration (e.g. for execute_code turns).""" - with self._lock: - if self._used > 0: - self._used -= 1 - - @property - def used(self) -> int: - with self._lock: - return self._used - - @property - def remaining(self) -> int: - with self._lock: - return max(0, self.max_total - self._used) - - - -# Maximum number of concurrent worker threads for parallel tool execution. _MAX_TOOL_WORKERS = 8 # Guard so the OpenRouter metadata pre-warm thread is only spawned once per