diff --git a/run_agent.py b/run_agent.py index e7dec855a..36491e644 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3585,7 +3585,20 @@ class AIAgent: def _call_chat_completions(): """Stream a chat completions response.""" - stream_kwargs = {**api_kwargs, "stream": True, "stream_options": {"include_usage": True}} + import httpx as _httpx + _base_timeout = float(os.getenv("HERMES_API_TIMEOUT", 900.0)) + _stream_read_timeout = float(os.getenv("HERMES_STREAM_READ_TIMEOUT", 60.0)) + stream_kwargs = { + **api_kwargs, + "stream": True, + "stream_options": {"include_usage": True}, + "timeout": _httpx.Timeout( + connect=30.0, + read=_stream_read_timeout, + write=_base_timeout, + pool=30.0, + ), + } request_client_holder["client"] = self._create_request_openai_client( reason="chat_completion_stream_request" ) @@ -3653,6 +3666,7 @@ class AIAgent: name = entry["function"]["name"] if name and idx not in tool_gen_notified: tool_gen_notified.add(idx) + _fire_first_delta() self._fire_tool_gen_started(name) if chunk.choices[0].finish_reason: @@ -3721,6 +3735,7 @@ class AIAgent: has_tool_use = True tool_name = getattr(block, "name", None) if tool_name: + _fire_first_delta() self._fire_tool_gen_started(tool_name) elif event_type == "content_block_delta": @@ -3742,29 +3757,84 @@ class AIAgent: return stream.get_final_message() def _call(): + import httpx as _httpx + + _max_stream_retries = int(os.getenv("HERMES_STREAM_RETRIES", 2)) + try: - if self.api_mode == "anthropic_messages": - self._try_refresh_anthropic_client_credentials() - result["response"] = _call_anthropic() - else: - result["response"] = _call_chat_completions() - except Exception as e: - if deltas_were_sent["yes"]: - # Streaming failed AFTER some tokens were already delivered - # to consumers. Don't fall back — that would cause - # double-delivery (partial streamed + full non-streamed). - # Let the error propagate; the partial content already - # reached the user via the stream. - logger.warning("Streaming failed after partial delivery, not falling back: %s", e) - result["error"] = e - else: - # Streaming failed before any tokens reached consumers. - # Safe to fall back to the standard non-streaming path. - logger.info("Streaming failed before delivery, falling back to non-streaming: %s", e) + for _stream_attempt in range(_max_stream_retries + 1): try: - result["response"] = self._interruptible_api_call(api_kwargs) - except Exception as fallback_err: - result["error"] = fallback_err + if self.api_mode == "anthropic_messages": + self._try_refresh_anthropic_client_credentials() + result["response"] = _call_anthropic() + else: + result["response"] = _call_chat_completions() + return # success + except Exception as e: + if deltas_were_sent["yes"]: + # Streaming failed AFTER some tokens were already + # delivered. Don't retry or fall back — partial + # content already reached the user. + logger.warning( + "Streaming failed after partial delivery, not retrying: %s", e + ) + result["error"] = e + return + + _is_timeout = isinstance( + e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout) + ) + _is_conn_err = isinstance( + e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError) + ) + + if _is_timeout or _is_conn_err: + # Transient network / timeout error. Retry the + # streaming request with a fresh connection rather + # than falling back to non-streaming (which would + # hang for up to 15 min on the same dead server). + if _stream_attempt < _max_stream_retries: + logger.info( + "Streaming attempt %s/%s failed (%s: %s), " + "retrying with fresh connection...", + _stream_attempt + 1, + _max_stream_retries + 1, + type(e).__name__, + e, + ) + # Close the stale request client before retry + stale = request_client_holder.get("client") + if stale is not None: + self._close_request_openai_client( + stale, reason="stream_retry_cleanup" + ) + request_client_holder["client"] = None + continue + # Exhausted retries — propagate to outer loop + logger.warning( + "Streaming exhausted %s retries on transient error: %s", + _max_stream_retries + 1, + e, + ) + result["error"] = e + return + + # Non-transient error (e.g. "streaming not supported", + # auth error, 4xx). Fall back to non-streaming once. + err_msg = str(e).lower() + if "stream" in err_msg and "not supported" in err_msg: + logger.info( + "Streaming not supported, falling back to non-streaming: %s", e + ) + try: + result["response"] = self._interruptible_api_call(api_kwargs) + except Exception as fallback_err: + result["error"] = fallback_err + return + + # Unknown error — propagate to outer retry loop + result["error"] = e + return finally: request_client = request_client_holder.get("client") if request_client is not None: diff --git a/tests/tools/test_skills_hub.py b/tests/tools/test_skills_hub.py index c74fa2d88..778a77ba1 100644 --- a/tests/tools/test_skills_hub.py +++ b/tests/tools/test_skills_hub.py @@ -305,6 +305,154 @@ class TestSkillsShSource: assert bundle.files["SKILL.md"] == "# react" assert mock_get.called + @patch("tools.skills_hub._write_index_cache") + @patch("tools.skills_hub._read_index_cache", return_value=None) + @patch("tools.skills_hub.httpx.get") + @patch.object(GitHubSource, "fetch") + def test_fetch_falls_back_to_tree_search_for_deeply_nested_skills( + self, mock_fetch, mock_get, _mock_read_cache, _mock_write_cache, + ): + """Skills in deeply nested dirs (e.g. cli-tool/components/skills/dev/my-skill/) + are found via the GitHub Trees API when candidate paths and shallow scan fail.""" + tree_entries = [ + {"path": "README.md", "type": "blob"}, + {"path": "cli-tool/components/skills/development/my-skill/SKILL.md", "type": "blob"}, + {"path": "cli-tool/components/skills/development/other-skill/SKILL.md", "type": "blob"}, + ] + + def _httpx_get_side_effect(url, **kwargs): + resp = MagicMock() + if "/api/search" in url: + resp.status_code = 404 + return resp + if url.endswith("/contents/"): + # Root listing for shallow scan — return empty so it falls through + resp.status_code = 200 + resp.json = lambda: [] + return resp + if "/contents/" in url: + # All contents API calls fail (candidate paths miss) + resp.status_code = 404 + return resp + if url.endswith("owner/repo"): + # Repo info → default branch + resp.status_code = 200 + resp.json = lambda: {"default_branch": "main"} + return resp + if "/git/trees/main" in url: + resp.status_code = 200 + resp.json = lambda: {"tree": tree_entries} + return resp + # skills.sh detail page + resp.status_code = 200 + resp.text = "