From d6c09ab94a54a7aafd6bfbeebac4b9d965152d17 Mon Sep 17 00:00:00 2001 From: simon-marcus Date: Tue, 14 Apr 2026 09:48:49 -0400 Subject: [PATCH 01/16] feat(api-server): stream /v1/responses SSE tool events --- gateway/platforms/api_server.py | 486 ++++++++++++++++++ tests/gateway/test_api_server.py | 125 +++++ .../docs/user-guide/features/api-server.md | 6 +- .../docs/user-guide/messaging/open-webui.md | 4 +- 4 files changed, 617 insertions(+), 4 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 2077c9c85..60b03a2c6 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -515,6 +515,8 @@ class APIServerAdapter(BasePlatformAdapter): session_id: Optional[str] = None, stream_delta_callback=None, tool_progress_callback=None, + tool_start_callback=None, + tool_complete_callback=None, ) -> Any: """ Create an AIAgent instance using the gateway's runtime config. @@ -553,6 +555,8 @@ class APIServerAdapter(BasePlatformAdapter): platform="api_server", stream_delta_callback=stream_delta_callback, tool_progress_callback=tool_progress_callback, + tool_start_callback=tool_start_callback, + tool_complete_callback=tool_complete_callback, session_db=self._ensure_session_db(), fallback_model=fallback_model, ) @@ -965,6 +969,410 @@ class APIServerAdapter(BasePlatformAdapter): return response + async def _write_sse_responses( + self, + request: "web.Request", + response_id: str, + model: str, + created_at: int, + stream_q, + agent_task, + agent_ref, + conversation_history: List[Dict[str, str]], + user_message: str, + instructions: Optional[str], + conversation: Optional[str], + store: bool, + session_id: str, + ) -> "web.StreamResponse": + """Write an SSE stream for POST /v1/responses (OpenAI Responses API). + + Emits spec-compliant event types as the agent runs: + + - ``response.created`` — initial envelope (status=in_progress) + - ``response.output_text.delta`` / ``response.output_text.done`` — + streamed assistant text + - ``response.output_item.added`` / ``response.output_item.done`` + with ``item.type == "function_call"`` — when the agent invokes a + tool (both events fire; the ``done`` event carries the finalized + ``arguments`` string) + - ``response.output_item.added`` with + ``item.type == "function_call_output"`` — tool result with + ``{call_id, output, status}`` + - ``response.completed`` — terminal event carrying the full + response object with all output items + usage (same payload + shape as the non-streaming path for parity) + - ``response.failed`` — terminal event on agent error + + If the client disconnects mid-stream, ``agent.interrupt()`` is + called so the agent stops issuing upstream LLM calls, then the + asyncio task is cancelled. When ``store=True`` the full response + is persisted to the ResponseStore in a ``finally`` block so GET + /v1/responses/{id} and ``previous_response_id`` chaining work the + same as the batch path. + """ + import queue as _q + + sse_headers = { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + } + origin = request.headers.get("Origin", "") + cors = self._cors_headers_for_origin(origin) if origin else None + if cors: + sse_headers.update(cors) + if session_id: + sse_headers["X-Hermes-Session-Id"] = session_id + response = web.StreamResponse(status=200, headers=sse_headers) + await response.prepare(request) + + # State accumulated during the stream + final_text_parts: List[str] = [] + # Track open function_call items by name so we can emit a matching + # ``done`` event when the tool completes. Order preserved. + pending_tool_calls: List[Dict[str, Any]] = [] + # Output items we've emitted so far (used to build the terminal + # response.completed payload). Kept in the order they appeared. + emitted_items: List[Dict[str, Any]] = [] + # Monotonic counter for output_index (spec requires it). + output_index = 0 + # Monotonic counter for call_id generation if the agent doesn't + # provide one (it doesn't, from tool_progress_callback). + call_counter = 0 + # Track the assistant message item id + content index for text + # delta events — the spec ties deltas to a specific item. + message_item_id = f"msg_{uuid.uuid4().hex[:24]}" + message_output_index: Optional[int] = None + message_opened = False + + async def _write_event(event_type: str, data: Dict[str, Any]) -> None: + payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" + await response.write(payload.encode()) + + def _envelope(status: str) -> Dict[str, Any]: + env: Dict[str, Any] = { + "id": response_id, + "object": "response", + "status": status, + "created_at": created_at, + "model": model, + } + return env + + final_response_text = "" + agent_error: Optional[str] = None + usage: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + + try: + # response.created — initial envelope, status=in_progress + created_env = _envelope("in_progress") + created_env["output"] = [] + await _write_event("response.created", { + "type": "response.created", + "response": created_env, + }) + last_activity = time.monotonic() + + async def _open_message_item() -> None: + """Emit response.output_item.added for the assistant message + the first time any text delta arrives.""" + nonlocal message_opened, message_output_index, output_index + if message_opened: + return + message_opened = True + message_output_index = output_index + output_index += 1 + item = { + "id": message_item_id, + "type": "message", + "status": "in_progress", + "role": "assistant", + "content": [], + } + await _write_event("response.output_item.added", { + "type": "response.output_item.added", + "output_index": message_output_index, + "item": item, + }) + + async def _emit_text_delta(delta_text: str) -> None: + await _open_message_item() + final_text_parts.append(delta_text) + await _write_event("response.output_text.delta", { + "type": "response.output_text.delta", + "item_id": message_item_id, + "output_index": message_output_index, + "content_index": 0, + "delta": delta_text, + }) + + async def _emit_tool_started(payload: Dict[str, Any]) -> str: + """Emit response.output_item.added for a function_call. + + Returns the call_id so the matching completion event can + reference it. Prefer the real ``tool_call_id`` from the + agent when available; fall back to a generated call id for + safety in tests or older code paths. + """ + nonlocal output_index, call_counter + call_counter += 1 + call_id = payload.get("tool_call_id") or f"call_{response_id[5:]}_{call_counter}" + args = payload.get("arguments", {}) + if isinstance(args, dict): + arguments_str = json.dumps(args) + else: + arguments_str = str(args) + item = { + "id": f"fc_{uuid.uuid4().hex[:24]}", + "type": "function_call", + "status": "in_progress", + "name": payload.get("name", ""), + "call_id": call_id, + "arguments": arguments_str, + } + idx = output_index + output_index += 1 + pending_tool_calls.append({ + "call_id": call_id, + "name": payload.get("name", ""), + "arguments": arguments_str, + "item_id": item["id"], + "output_index": idx, + }) + emitted_items.append({ + "type": "function_call", + "name": payload.get("name", ""), + "arguments": arguments_str, + "call_id": call_id, + }) + await _write_event("response.output_item.added", { + "type": "response.output_item.added", + "output_index": idx, + "item": item, + }) + return call_id + + async def _emit_tool_completed(payload: Dict[str, Any]) -> None: + """Emit response.output_item.done (function_call) followed + by response.output_item.added (function_call_output).""" + nonlocal output_index + call_id = payload.get("tool_call_id") + result = payload.get("result", "") + pending = None + if call_id: + for i, p in enumerate(pending_tool_calls): + if p["call_id"] == call_id: + pending = pending_tool_calls.pop(i) + break + if pending is None: + # Completion without a matching start — skip to avoid + # emitting orphaned done events. + return + + # function_call done + done_item = { + "id": pending["item_id"], + "type": "function_call", + "status": "completed", + "name": pending["name"], + "call_id": pending["call_id"], + "arguments": pending["arguments"], + } + await _write_event("response.output_item.done", { + "type": "response.output_item.done", + "output_index": pending["output_index"], + "item": done_item, + }) + + # function_call_output added (result) + result_str = result if isinstance(result, str) else json.dumps(result) + output_item = { + "id": f"fco_{uuid.uuid4().hex[:24]}", + "type": "function_call_output", + "call_id": pending["call_id"], + "output": result_str, + "status": "completed", + } + idx = output_index + output_index += 1 + emitted_items.append({ + "type": "function_call_output", + "call_id": pending["call_id"], + "output": result_str, + }) + await _write_event("response.output_item.added", { + "type": "response.output_item.added", + "output_index": idx, + "item": output_item, + }) + + # Main drain loop — thread-safe queue fed by agent callbacks. + async def _dispatch(it) -> None: + """Route a queue item to the correct SSE emitter. + + Plain strings are text deltas. Tagged tuples with + ``__tool_started__`` / ``__tool_completed__`` prefixes + are tool lifecycle events. + """ + if isinstance(it, tuple) and len(it) == 2 and isinstance(it[0], str): + tag, payload = it + if tag == "__tool_started__": + await _emit_tool_started(payload) + elif tag == "__tool_completed__": + await _emit_tool_completed(payload) + # Unknown tags are silently ignored (forward-compat). + elif isinstance(it, str): + await _emit_text_delta(it) + # Other types (non-string, non-tuple) are silently dropped. + + loop = asyncio.get_event_loop() + while True: + try: + item = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5)) + except _q.Empty: + if agent_task.done(): + # Drain remaining + while True: + try: + item = stream_q.get_nowait() + if item is None: + break + await _dispatch(item) + last_activity = time.monotonic() + except _q.Empty: + break + break + if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS: + await response.write(b": keepalive\n\n") + last_activity = time.monotonic() + continue + + if item is None: # EOS sentinel + break + + await _dispatch(item) + last_activity = time.monotonic() + + # Pick up agent result + usage from the completed task + try: + result, agent_usage = await agent_task + usage = agent_usage or usage + # If the agent produced a final_response but no text + # deltas were streamed (e.g. some providers only emit + # the full response at the end), emit a single fallback + # delta so Responses clients still receive a live text part. + agent_final = result.get("final_response", "") if isinstance(result, dict) else "" + if agent_final and not final_text_parts: + await _emit_text_delta(agent_final) + if agent_final and not final_response_text: + final_response_text = agent_final + if isinstance(result, dict) and result.get("error") and not final_response_text: + agent_error = result["error"] + except Exception as e: # noqa: BLE001 + logger.error("Error running agent for streaming responses: %s", e, exc_info=True) + agent_error = str(e) + + # Close the message item if it was opened + final_response_text = "".join(final_text_parts) or final_response_text + if message_opened: + await _write_event("response.output_text.done", { + "type": "response.output_text.done", + "item_id": message_item_id, + "output_index": message_output_index, + "content_index": 0, + "text": final_response_text, + }) + msg_done_item = { + "id": message_item_id, + "type": "message", + "status": "completed", + "role": "assistant", + "content": [ + {"type": "output_text", "text": final_response_text} + ], + } + await _write_event("response.output_item.done", { + "type": "response.output_item.done", + "output_index": message_output_index, + "item": msg_done_item, + }) + + # Always append a final message item in the completed + # response envelope so clients that only parse the terminal + # payload still see the assistant text. This mirrors the + # shape produced by _extract_output_items in the batch path. + final_items: List[Dict[str, Any]] = list(emitted_items) + final_items.append({ + "type": "message", + "role": "assistant", + "content": [ + {"type": "output_text", "text": final_response_text or (agent_error or "")} + ], + }) + + if agent_error: + failed_env = _envelope("failed") + failed_env["output"] = final_items + failed_env["error"] = {"message": agent_error, "type": "server_error"} + failed_env["usage"] = { + "input_tokens": usage.get("input_tokens", 0), + "output_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("total_tokens", 0), + } + await _write_event("response.failed", { + "type": "response.failed", + "response": failed_env, + }) + else: + completed_env = _envelope("completed") + completed_env["output"] = final_items + completed_env["usage"] = { + "input_tokens": usage.get("input_tokens", 0), + "output_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("total_tokens", 0), + } + await _write_event("response.completed", { + "type": "response.completed", + "response": completed_env, + }) + + # Persist for future chaining / GET retrieval, mirroring + # the batch path behavior. + if store: + full_history = list(conversation_history) + full_history.append({"role": "user", "content": user_message}) + if isinstance(result, dict) and result.get("messages"): + full_history.extend(result["messages"]) + else: + full_history.append({"role": "assistant", "content": final_response_text}) + self._response_store.put(response_id, { + "response": completed_env, + "conversation_history": full_history, + "instructions": instructions, + }) + if conversation: + self._response_store.set_conversation(conversation, response_id) + + except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError): + # Client disconnected — interrupt the agent so it stops + # making upstream LLM calls, then cancel the task. + agent = agent_ref[0] if agent_ref else None + if agent is not None: + try: + agent.interrupt("SSE client disconnected") + except Exception: + pass + if not agent_task.done(): + agent_task.cancel() + try: + await agent_task + except (asyncio.CancelledError, Exception): + pass + logger.info("SSE client disconnected; interrupted agent task %s", response_id) + + return response + async def _handle_responses(self, request: "web.Request") -> "web.Response": """POST /v1/responses — OpenAI Responses API format.""" auth_err = self._check_auth(request) @@ -1060,6 +1468,80 @@ class APIServerAdapter(BasePlatformAdapter): # Run the agent (with Idempotency-Key support) session_id = str(uuid.uuid4()) + stream = bool(body.get("stream", False)) + if stream: + # Streaming branch — emit OpenAI Responses SSE events as the + # agent runs so frontends can render text deltas and tool + # calls in real time. See _write_sse_responses for details. + import queue as _q + _stream_q: _q.Queue = _q.Queue() + + def _on_delta(delta): + # None from the agent is a CLI box-close signal, not EOS. + # Forwarding would kill the SSE stream prematurely; the + # SSE writer detects completion via agent_task.done(). + if delta is not None: + _stream_q.put(delta) + + def _on_tool_progress(event_type, name, preview, args, **kwargs): + """Queue non-start tool progress events if needed in future. + + The structured Responses stream uses ``tool_start_callback`` + and ``tool_complete_callback`` for exact call-id correlation, + so progress events are currently ignored here. + """ + return + + def _on_tool_start(tool_call_id, function_name, function_args): + """Queue a started tool for live function_call streaming.""" + _stream_q.put(("__tool_started__", { + "tool_call_id": tool_call_id, + "name": function_name, + "arguments": function_args or {}, + })) + + def _on_tool_complete(tool_call_id, function_name, function_args, function_result): + """Queue a completed tool result for live function_call_output streaming.""" + _stream_q.put(("__tool_completed__", { + "tool_call_id": tool_call_id, + "name": function_name, + "arguments": function_args or {}, + "result": function_result, + })) + + agent_ref = [None] + agent_task = asyncio.ensure_future(self._run_agent( + user_message=user_message, + conversation_history=conversation_history, + ephemeral_system_prompt=instructions, + session_id=session_id, + stream_delta_callback=_on_delta, + tool_progress_callback=_on_tool_progress, + tool_start_callback=_on_tool_start, + tool_complete_callback=_on_tool_complete, + agent_ref=agent_ref, + )) + + response_id = f"resp_{uuid.uuid4().hex[:28]}" + model_name = body.get("model", self._model_name) + created_at = int(time.time()) + + return await self._write_sse_responses( + request=request, + response_id=response_id, + model=model_name, + created_at=created_at, + stream_q=_stream_q, + agent_task=agent_task, + agent_ref=agent_ref, + conversation_history=conversation_history, + user_message=user_message, + instructions=instructions, + conversation=conversation, + store=store, + session_id=session_id, + ) + async def _compute_response(): return await self._run_agent( user_message=user_message, @@ -1486,6 +1968,8 @@ class APIServerAdapter(BasePlatformAdapter): session_id: Optional[str] = None, stream_delta_callback=None, tool_progress_callback=None, + tool_start_callback=None, + tool_complete_callback=None, agent_ref: Optional[list] = None, ) -> tuple: """ @@ -1507,6 +1991,8 @@ class APIServerAdapter(BasePlatformAdapter): session_id=session_id, stream_delta_callback=stream_delta_callback, tool_progress_callback=tool_progress_callback, + tool_start_callback=tool_start_callback, + tool_complete_callback=tool_complete_callback, ) if agent_ref is not None: agent_ref[0] = agent diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index be1fc63bf..28df94e0f 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1115,6 +1115,131 @@ class TestResponsesEndpoint: assert resp.status == 400 +class TestResponsesStreaming: + @pytest.mark.asyncio + async def test_stream_true_returns_responses_sse(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + cb = kwargs.get("stream_delta_callback") + if cb: + cb("Hello") + cb(" world") + return ( + {"final_response": "Hello world", "messages": [], "api_calls": 1}, + {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "hi", "stream": True}, + ) + assert resp.status == 200 + assert "text/event-stream" in resp.headers.get("Content-Type", "") + body = await resp.text() + assert "event: response.created" in body + assert "event: response.output_text.delta" in body + assert "event: response.output_text.done" in body + assert "event: response.completed" in body + assert "Hello" in body + assert " world" in body + + @pytest.mark.asyncio + async def test_stream_emits_function_call_and_output_items(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + start_cb = kwargs.get("tool_start_callback") + complete_cb = kwargs.get("tool_complete_callback") + text_cb = kwargs.get("stream_delta_callback") + if start_cb: + start_cb("call_123", "read_file", {"path": "/tmp/test.txt"}) + if complete_cb: + complete_cb("call_123", "read_file", {"path": "/tmp/test.txt"}, '{"content":"hello"}') + if text_cb: + text_cb("Done.") + return ( + { + "final_response": "Done.", + "messages": [ + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_123", + "function": { + "name": "read_file", + "arguments": '{"path":"/tmp/test.txt"}', + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_123", + "content": '{"content":"hello"}', + }, + ], + "api_calls": 1, + }, + {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "read the file", "stream": True}, + ) + assert resp.status == 200 + body = await resp.text() + assert "event: response.output_item.added" in body + assert "event: response.output_item.done" in body + assert '"type": "function_call"' in body + assert '"type": "function_call_output"' in body + assert '"call_id": "call_123"' in body + assert '"name": "read_file"' in body + assert '"output": "{\\"content\\":\\"hello\\"}"' in body + + @pytest.mark.asyncio + async def test_streamed_response_is_stored_for_get(self, adapter): + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + cb = kwargs.get("stream_delta_callback") + if cb: + cb("Stored response") + return ( + {"final_response": "Stored response", "messages": [], "api_calls": 1}, + {"input_tokens": 1, "output_tokens": 2, "total_tokens": 3}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "store this", "stream": True}, + ) + body = await resp.text() + response_id = None + for line in body.splitlines(): + if line.startswith("data: "): + try: + payload = json.loads(line[len("data: "):]) + except json.JSONDecodeError: + continue + if payload.get("type") == "response.completed": + response_id = payload["response"]["id"] + break + assert response_id + + get_resp = await cli.get(f"/v1/responses/{response_id}") + assert get_resp.status == 200 + data = await get_resp.json() + assert data["id"] == response_id + assert data["status"] == "completed" + assert data["output"][-1]["content"][0]["text"] == "Stored response" + + # --------------------------------------------------------------------------- # Auth on endpoints # --------------------------------------------------------------------------- diff --git a/website/docs/user-guide/features/api-server.md b/website/docs/user-guide/features/api-server.md index efb254a00..52ed8e893 100644 --- a/website/docs/user-guide/features/api-server.md +++ b/website/docs/user-guide/features/api-server.md @@ -83,9 +83,11 @@ Standard OpenAI Chat Completions format. Stateless — the full conversation is } ``` -**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. When streaming is enabled in config, tokens are emitted live as the LLM generates them. When disabled, the full response is sent as a single SSE chunk. +**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. For **Chat Completions**, the stream uses standard `chat.completion.chunk` events plus Hermes' custom `hermes.tool.progress` event for tool-start UX. For **Responses**, the stream uses OpenAI Responses event types such as `response.created`, `response.output_text.delta`, `response.output_item.added`, `response.output_item.done`, and `response.completed`. -**Tool progress in streams**: When the agent calls tools during a streaming request, brief progress indicators are injected into the content stream as the tools start executing (e.g. `` `💻 pwd` ``, `` `🔍 Python docs` ``). These appear as inline markdown before the agent's response text, giving frontends like Open WebUI real-time visibility into tool execution. +**Tool progress in streams**: +- **Chat Completions**: Hermes emits `event: hermes.tool.progress` for tool-start visibility without polluting persisted assistant text. +- **Responses**: Hermes emits spec-native `function_call` and `function_call_output` output items during the SSE stream, so clients can render structured tool UI in real time. ### POST /v1/responses diff --git a/website/docs/user-guide/messaging/open-webui.md b/website/docs/user-guide/messaging/open-webui.md index 71860d367..b26d23edd 100644 --- a/website/docs/user-guide/messaging/open-webui.md +++ b/website/docs/user-guide/messaging/open-webui.md @@ -134,10 +134,10 @@ To use the Responses API mode: 3. Change **API Type** from "Chat Completions" to **"Responses (Experimental)"** 4. Save -With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`. +With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`. When `stream: true`, Hermes also streams spec-native `function_call` and `function_call_output` items, which enables custom structured tool-call UI in clients that render Responses events. :::note -Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The Responses API mode is mainly useful for future compatibility as frontends evolve. +Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The main advantage of Responses mode today is the structured event stream: text deltas, `function_call`, and `function_call_output` items arrive as OpenAI Responses SSE events instead of Chat Completions chunks. ::: ## How It Works From 302554b1588370dde48ecd78b9b64e2f6fd8c1fe Mon Sep 17 00:00:00 2001 From: simon-marcus Date: Tue, 14 Apr 2026 13:14:26 -0400 Subject: [PATCH 02/16] fix(api-server): format responses tool outputs for open webui --- gateway/platforms/api_server.py | 27 ++++++++++++++++++++++++--- tests/gateway/test_api_server.py | 7 +++++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 60b03a2c6..62196973d 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1040,6 +1040,10 @@ class APIServerAdapter(BasePlatformAdapter): # Monotonic counter for call_id generation if the agent doesn't # provide one (it doesn't, from tool_progress_callback). call_counter = 0 + # Canonical Responses SSE events include a monotonically increasing + # sequence_number. Add it server-side for every emitted event so + # clients that validate the OpenAI event schema can parse our stream. + sequence_number = 0 # Track the assistant message item id + content index for text # delta events — the spec ties deltas to a specific item. message_item_id = f"msg_{uuid.uuid4().hex[:24]}" @@ -1047,6 +1051,10 @@ class APIServerAdapter(BasePlatformAdapter): message_opened = False async def _write_event(event_type: str, data: Dict[str, Any]) -> None: + nonlocal sequence_number + if "sequence_number" not in data: + data["sequence_number"] = sequence_number + sequence_number += 1 payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n" await response.write(payload.encode()) @@ -1105,6 +1113,7 @@ class APIServerAdapter(BasePlatformAdapter): "output_index": message_output_index, "content_index": 0, "delta": delta_text, + "logprobs": [], }) async def _emit_tool_started(payload: Dict[str, Any]) -> str: @@ -1187,11 +1196,12 @@ class APIServerAdapter(BasePlatformAdapter): # function_call_output added (result) result_str = result if isinstance(result, str) else json.dumps(result) + output_parts = [{"type": "input_text", "text": result_str}] output_item = { "id": f"fco_{uuid.uuid4().hex[:24]}", "type": "function_call_output", "call_id": pending["call_id"], - "output": result_str, + "output": output_parts, "status": "completed", } idx = output_index @@ -1199,13 +1209,18 @@ class APIServerAdapter(BasePlatformAdapter): emitted_items.append({ "type": "function_call_output", "call_id": pending["call_id"], - "output": result_str, + "output": output_parts, }) await _write_event("response.output_item.added", { "type": "response.output_item.added", "output_index": idx, "item": output_item, }) + await _write_event("response.output_item.done", { + "type": "response.output_item.done", + "output_index": idx, + "item": output_item, + }) # Main drain loop — thread-safe queue fed by agent callbacks. async def _dispatch(it) -> None: @@ -1282,6 +1297,7 @@ class APIServerAdapter(BasePlatformAdapter): "output_index": message_output_index, "content_index": 0, "text": final_response_text, + "logprobs": [], }) msg_done_item = { "id": message_item_id, @@ -1933,10 +1949,15 @@ class APIServerAdapter(BasePlatformAdapter): "call_id": tc.get("id", ""), }) elif role == "tool": + output_content = msg.get("content", "") + if isinstance(output_content, list): + output = output_content + else: + output = [{"type": "input_text", "text": str(output_content)}] items.append({ "type": "function_call_output", "call_id": msg.get("tool_call_id", ""), - "output": msg.get("content", ""), + "output": output, }) # Final assistant message diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 28df94e0f..32346fc83 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1142,6 +1142,8 @@ class TestResponsesStreaming: assert "event: response.output_text.delta" in body assert "event: response.output_text.done" in body assert "event: response.completed" in body + assert '"sequence_number":' in body + assert '"logprobs": []' in body assert "Hello" in body assert " world" in body @@ -1195,11 +1197,12 @@ class TestResponsesStreaming: body = await resp.text() assert "event: response.output_item.added" in body assert "event: response.output_item.done" in body + assert body.count("event: response.output_item.done") >= 2 assert '"type": "function_call"' in body assert '"type": "function_call_output"' in body assert '"call_id": "call_123"' in body assert '"name": "read_file"' in body - assert '"output": "{\\"content\\":\\"hello\\"}"' in body + assert '"output": [{"type": "input_text", "text": "{\\"content\\":\\"hello\\"}"}]' in body @pytest.mark.asyncio async def test_streamed_response_is_stored_for_get(self, adapter): @@ -1544,7 +1547,7 @@ class TestToolCallsInOutput: assert output[0]["call_id"] == "call_abc123" assert output[1]["type"] == "function_call_output" assert output[1]["call_id"] == "call_abc123" - assert output[1]["output"] == "42" + assert output[1]["output"] == [{"type": "input_text", "text": "42"}] assert output[2]["type"] == "message" assert output[2]["content"][0]["text"] == "The result is 42." From cf1d71882304c5c37f9194c389ac2159c23a1c4a Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 20:47:06 -0700 Subject: [PATCH 03/16] fix: keep batch-path function_call_output.output as string per OpenAI spec The streaming path emits output as content-part arrays for Open WebUI compatibility, but the batch (non-streaming) Responses API path must return output as a plain string per the OpenAI Responses API spec. Reverts the _extract_output_items change from the cherry-picked commits while preserving the streaming path's array format. --- gateway/platforms/api_server.py | 7 +------ tests/gateway/test_api_server.py | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 62196973d..32c56d1fb 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1949,15 +1949,10 @@ class APIServerAdapter(BasePlatformAdapter): "call_id": tc.get("id", ""), }) elif role == "tool": - output_content = msg.get("content", "") - if isinstance(output_content, list): - output = output_content - else: - output = [{"type": "input_text", "text": str(output_content)}] items.append({ "type": "function_call_output", "call_id": msg.get("tool_call_id", ""), - "output": output, + "output": msg.get("content", ""), }) # Final assistant message diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 32346fc83..8e3e066b8 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1547,7 +1547,7 @@ class TestToolCallsInOutput: assert output[0]["call_id"] == "call_abc123" assert output[1]["type"] == "function_call_output" assert output[1]["call_id"] == "call_abc123" - assert output[1]["output"] == [{"type": "input_text", "text": "42"}] + assert output[1]["output"] == "42" assert output[2]["type"] == "message" assert output[2]["content"][0]["text"] == "The result is 42." From 31d06206630669a27ca0cc4f0261a414d3e8af1e Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 20:47:57 -0700 Subject: [PATCH 04/16] chore: add simon-marcus to AUTHOR_MAP --- scripts/release.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/release.py b/scripts/release.py index fb7924640..74f0fc420 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -117,6 +117,7 @@ AUTHOR_MAP = { "m@statecraft.systems": "mbierling", "balyan.sid@gmail.com": "balyansid", "oluwadareab12@gmail.com": "bennytimz", + "simon@simonmarcus.org": "simon-marcus", # ── bulk addition: 75 emails resolved via API, PR salvage bodies, noreply # crossref, and GH contributor list matching (April 2026 audit) ── "1115117931@qq.com": "aaronagent", From 82f364ffd1d7f85cb4faa0fbfd2095ada0a78f84 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 14 Apr 2026 20:52:18 -0700 Subject: [PATCH 05/16] feat: add --all flag to gateway start and restart commands (#10043) - gateway start --all: kills all stale gateway processes across all profiles before starting the current profile's service - gateway restart --all: stops all gateway processes across all profiles, then starts the current profile's service fresh - gateway stop --all: already existed, unchanged The --all flag was only available on 'stop' but not on 'start' or 'restart', causing 'unrecognized arguments' errors for users. --- hermes_cli/gateway.py | 41 +++++++++++++++++++++++++++++++++++++++++ hermes_cli/main.py | 2 ++ 2 files changed, 43 insertions(+) diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index 4b13bc70f..58d9f92ed 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -2919,6 +2919,15 @@ def gateway_command(args): elif subcmd == "start": system = getattr(args, 'system', False) + start_all = getattr(args, 'all', False) + + if start_all: + # Kill all stale gateway processes across all profiles before starting + killed = kill_gateway_processes(all_profiles=True) + if killed: + print(f"✓ Killed {killed} stale gateway process(es) across all profiles") + _wait_for_gateway_exit(timeout=10.0, force_after=5.0) + if is_termux(): print("Gateway service start is not supported on Termux because there is no system service manager.") print("Run manually: hermes gateway") @@ -3004,7 +3013,39 @@ def gateway_command(args): # Try service first, fall back to killing and restarting service_available = False system = getattr(args, 'system', False) + restart_all = getattr(args, 'all', False) service_configured = False + + if restart_all: + # --all: stop every gateway process across all profiles, then start fresh + service_stopped = False + if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()): + try: + systemd_stop(system=system) + service_stopped = True + except subprocess.CalledProcessError: + pass + elif is_macos() and get_launchd_plist_path().exists(): + try: + launchd_stop() + service_stopped = True + except subprocess.CalledProcessError: + pass + killed = kill_gateway_processes(all_profiles=True) + total = killed + (1 if service_stopped else 0) + if total: + print(f"✓ Stopped {total} gateway process(es) across all profiles") + _wait_for_gateway_exit(timeout=10.0, force_after=5.0) + + # Start the current profile's service fresh + print("Starting gateway...") + if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()): + systemd_start(system=system) + elif is_macos() and get_launchd_plist_path().exists(): + launchd_start() + else: + run_gateway(verbose=0) + return if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()): service_configured = True diff --git a/hermes_cli/main.py b/hermes_cli/main.py index c73344be4..017280184 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -4749,6 +4749,7 @@ For more help on a command: # gateway start gateway_start = gateway_subparsers.add_parser("start", help="Start the installed systemd/launchd background service") gateway_start.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service") + gateway_start.add_argument("--all", action="store_true", help="Kill ALL stale gateway processes across all profiles before starting") # gateway stop gateway_stop = gateway_subparsers.add_parser("stop", help="Stop gateway service") @@ -4758,6 +4759,7 @@ For more help on a command: # gateway restart gateway_restart = gateway_subparsers.add_parser("restart", help="Restart gateway service") gateway_restart.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service") + gateway_restart.add_argument("--all", action="store_true", help="Kill ALL gateway processes across all profiles before restarting") # gateway status gateway_status = gateway_subparsers.add_parser("status", help="Show gateway status") From 92385679b64ef0f34aca2ec1b1031c4e639622bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=98=BF=E6=B3=A5=E8=B1=86?= <1243352777@qq.com> Date: Tue, 14 Apr 2026 16:58:37 +0800 Subject: [PATCH 06/16] fix: reset retry counters after compression and stop poisoning conversation history Three bugfixes in the agent loop: 1. Reset retry counters after context compression. Without this, pre-compression retry counts carry over, causing the model to hit empty-response recovery immediately after a compression- induced context loss, wasting API calls on a now-valid context. 2. Unmute output in the final-response (no-tool-call) branch. _mute_post_response could be left True from a prior housekeeping turn, silently suppressing empty-response warnings and recovery status that the user should see. 3. Stop injecting 'Calling the X tools...' into assistant message content when falling back to prior-turn content. This mutated conversation history with synthetic text that the model never produced, poisoning subsequent turns. --- run_agent.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/run_agent.py b/run_agent.py index 0d6be24d0..0814a8b49 100644 --- a/run_agent.py +++ b/run_agent.py @@ -8012,6 +8012,15 @@ class AIAgent: # skipping them because conversation_history is still the # pre-compression length. conversation_history = None + # Fix: reset retry counters after compression so the model + # gets a fresh budget on the compressed context. Without + # this, pre-compression retries carry over and the model + # hits "(empty)" immediately after compression-induced + # context loss. + self._empty_content_retries = 0 + self._thinking_prefill_retries = 0 + self._last_content_with_tools = None + self._mute_post_response = False # Re-estimate after compression _preflight_tokens = estimate_request_tokens_rough( messages, @@ -10202,6 +10211,13 @@ class AIAgent: # No tool calls - this is the final response final_response = assistant_message.content or "" + # Fix: unmute output when entering the no-tool-call branch + # so the user can see empty-response warnings and recovery + # status messages. _mute_post_response was set during a + # prior housekeeping tool turn and should not silence the + # final response path. + self._mute_post_response = False + # Check if response only has think block with no actual content after it if not self._has_content_after_think_block(final_response): # ── Partial stream recovery ───────────────────── @@ -10239,16 +10255,10 @@ class AIAgent: self._emit_status("↻ Empty response after tool calls — using earlier content as final answer") self._last_content_with_tools = None self._empty_content_retries = 0 - for i in range(len(messages) - 1, -1, -1): - msg = messages[i] - if msg.get("role") == "assistant" and msg.get("tool_calls"): - tool_names = [] - for tc in msg["tool_calls"]: - if not tc or not isinstance(tc, dict): continue - fn = tc.get("function", {}) - tool_names.append(fn.get("name", "unknown")) - msg["content"] = f"Calling the {', '.join(tool_names)} tool{'s' if len(tool_names) > 1 else ''}..." - break + # Do NOT modify the assistant message content — the + # old code injected "Calling the X tools..." which + # poisoned the conversation history. Just use the + # fallback text as the final response and break. final_response = self._strip_think_blocks(fallback).strip() self._response_was_previewed = True break From 23b87c8ca82299ccdbcde30c6b53bbae84da93de Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 20:55:34 -0700 Subject: [PATCH 07/16] chore: add zons-zhaozhy to AUTHOR_MAP --- scripts/release.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/release.py b/scripts/release.py index 74f0fc420..046255627 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -118,6 +118,7 @@ AUTHOR_MAP = { "balyan.sid@gmail.com": "balyansid", "oluwadareab12@gmail.com": "bennytimz", "simon@simonmarcus.org": "simon-marcus", + "1243352777@qq.com": "zons-zhaozhy", # ── bulk addition: 75 emails resolved via API, PR salvage bodies, noreply # crossref, and GH contributor list matching (April 2026 audit) ── "1115117931@qq.com": "aaronagent", From ca0ae56ccbb5a1308beece973b2c93c50075318e Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:03:05 -0700 Subject: [PATCH 08/16] fix: add 402 billing error hint to gateway error handler (#5220) (#10057) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: hermes gateway restart waits for service to come back up (#8260) Previously, systemd_restart() sent SIGUSR1 to the gateway, printed 'restart requested', and returned immediately. The gateway still needed to drain active agents, exit with code 75, wait for systemd's RestartSec=30, and start the new process. The user saw 'success' but the gateway was actually down for 30-60 seconds. Now the SIGUSR1 path blocks with progress feedback: Phase 1 — wait for old process to die: ⏳ User service draining active work... Polls os.kill(pid, 0) until ProcessLookupError (up to 90s) Phase 2 — wait for new process to become active: ⏳ Waiting for hermes-gateway to restart... Polls systemctl is-active + verifies new PID (up to 60s) Success: ✓ User service restarted (PID 12345) Timeout: ⚠ User service did not become active within 60s. Check status: hermes gateway status Check logs: journalctl --user -u hermes-gateway --since '2 min ago' The reload-or-restart fallback path (line 1189) already blocks because systemctl reload-or-restart is synchronous. Test plan: - Updated test to verify wait-for-restart behavior - All 118 gateway CLI tests pass * fix: add 402 billing error hint to gateway error handler (#5220) The gateway's exception handler for agent errors had specific hints for HTTP 401, 429, 529, 400, 500 — but not 402 (Payment Required / quota exhausted). Users hitting billing limits from custom proxy providers got a generic error with no guidance. Added: 'Your API balance or quota is exhausted. Check your provider dashboard.' The underlying billing classification (error_classifier.py) already correctly handles 402 as FailoverReason.billing with credential rotation and fallback. The original issue (#5220) where 402 killed the entire gateway was from an older version — on current main, 402 is excluded from the is_client_error abort path (line 9460) and goes through the proper retry/fallback/fail flow. Combined with PR #9875 (auto-recover from unexpected SIGTERM), even edge cases where the gateway dies are now survivable. --- gateway/run.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index d137d73c3..c59e9dff1 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4020,6 +4020,8 @@ class GatewayRunner: _hist_len = len(history) if 'history' in locals() else 0 if status_code == 401: status_hint = " Check your API key or run `claude /login` to refresh OAuth credentials." + elif status_code == 402: + status_hint = " Your API balance or quota is exhausted. Check your provider dashboard." elif status_code == 429: # Check if this is a plan usage limit (resets on a schedule) vs a transient rate limit _err_body = getattr(e, "response", None) From 5cbb45d93e8e70a91c517c8b89f7b817a02e5842 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:06:32 -0700 Subject: [PATCH 09/16] fix: preserve session_id across previous_response_id chains in /v1/responses (#10059) The /v1/responses endpoint generated a new UUID session_id for every request, even when previous_response_id was provided. This caused each turn of a multi-turn conversation to appear as a separate session on the web dashboard, despite the conversation history being correctly chained. Fix: store session_id alongside the response in the ResponseStore, and reuse it when a subsequent request chains via previous_response_id. Applies to both the non-streaming /v1/responses path and the streaming SSE path. The /v1/runs endpoint also gains session continuity from stored responses (explicit body.session_id still takes priority). Adds test verifying session_id is preserved across chained requests. --- gateway/platforms/api_server.py | 13 +++++++--- tests/gateway/test_api_server.py | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 32c56d1fb..7f4c8e8d6 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1366,6 +1366,7 @@ class APIServerAdapter(BasePlatformAdapter): "response": completed_env, "conversation_history": full_history, "instructions": instructions, + "session_id": session_id, }) if conversation: self._response_store.set_conversation(conversation, response_id) @@ -1459,11 +1460,13 @@ class APIServerAdapter(BasePlatformAdapter): if previous_response_id: logger.debug("Both conversation_history and previous_response_id provided; using conversation_history") + stored_session_id = None if not conversation_history and previous_response_id: stored = self._response_store.get(previous_response_id) if stored is None: return web.json_response(_openai_error(f"Previous response not found: {previous_response_id}"), status=404) conversation_history = list(stored.get("conversation_history", [])) + stored_session_id = stored.get("session_id") # If no instructions provided, carry forward from previous if instructions is None: instructions = stored.get("instructions") @@ -1481,8 +1484,9 @@ class APIServerAdapter(BasePlatformAdapter): if body.get("truncation") == "auto" and len(conversation_history) > 100: conversation_history = conversation_history[-100:] - # Run the agent (with Idempotency-Key support) - session_id = str(uuid.uuid4()) + # Reuse session from previous_response_id chain so the dashboard + # groups the entire conversation under one session entry. + session_id = stored_session_id or str(uuid.uuid4()) stream = bool(body.get("stream", False)) if stream: @@ -1631,6 +1635,7 @@ class APIServerAdapter(BasePlatformAdapter): "response": response_data, "conversation_history": full_history, "instructions": instructions, + "session_id": session_id, }) # Update conversation mapping so the next request with the same # conversation name automatically chains to this response @@ -2145,10 +2150,12 @@ class APIServerAdapter(BasePlatformAdapter): if previous_response_id: logger.debug("Both conversation_history and previous_response_id provided; using conversation_history") + stored_session_id = None if not conversation_history and previous_response_id: stored = self._response_store.get(previous_response_id) if stored: conversation_history = list(stored.get("conversation_history", [])) + stored_session_id = stored.get("session_id") if instructions is None: instructions = stored.get("instructions") @@ -2167,7 +2174,7 @@ class APIServerAdapter(BasePlatformAdapter): ) conversation_history.append({"role": msg["role"], "content": str(content)}) - session_id = body.get("session_id") or run_id + session_id = body.get("session_id") or stored_session_id or run_id ephemeral_system_prompt = instructions async def _run_and_close(): diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 8e3e066b8..d0cebacb8 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1016,6 +1016,47 @@ class TestResponsesEndpoint: assert len(call_kwargs["conversation_history"]) > 0 assert call_kwargs["user_message"] == "Now add 1 more" + @pytest.mark.asyncio + async def test_previous_response_id_preserves_session(self, adapter): + """Chained responses via previous_response_id reuse the same session_id.""" + mock_result = { + "final_response": "ok", + "messages": [{"role": "assistant", "content": "ok"}], + "api_calls": 1, + } + usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + # First request — establishes a session + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = (mock_result, usage) + resp1 = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "Hello"}, + ) + assert resp1.status == 200 + first_session_id = mock_run.call_args.kwargs["session_id"] + data1 = await resp1.json() + response_id = data1["id"] + + # Second request — chains from the first + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = (mock_result, usage) + resp2 = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Follow up", + "previous_response_id": response_id, + }, + ) + assert resp2.status == 200 + second_session_id = mock_run.call_args.kwargs["session_id"] + + # Session must be the same across the chain + assert first_session_id == second_session_id + @pytest.mark.asyncio async def test_invalid_previous_response_id_returns_404(self, adapter): app = _create_app(adapter) From 2871ef18078ba2464d9afebeaf3e7ad67e4d4a5f Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:07:37 -0700 Subject: [PATCH 10/16] docs: note session continuity for previous_response_id chains (#10060) --- website/docs/user-guide/features/api-server.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/docs/user-guide/features/api-server.md b/website/docs/user-guide/features/api-server.md index 52ed8e893..ebcb4523e 100644 --- a/website/docs/user-guide/features/api-server.md +++ b/website/docs/user-guide/features/api-server.md @@ -130,7 +130,7 @@ Chain responses to maintain full context (including tool calls) across turns: } ``` -The server reconstructs the full conversation from the stored response chain — all previous tool calls and results are preserved. +The server reconstructs the full conversation from the stored response chain — all previous tool calls and results are preserved. Chained requests also share the same session, so multi-turn conversations appear as a single entry in the dashboard and session history. #### Named conversations From 4b2a1a4337a0409d13146596a219b480e216471a Mon Sep 17 00:00:00 2001 From: Greer Guthrie <149740518+g-guthrie@users.noreply.github.com> Date: Tue, 14 Apr 2026 18:02:25 -0500 Subject: [PATCH 11/16] fix(tools): auto-discover built-in tool modules --- model_tools.py | 42 +-------------------- tests/tools/test_registry.py | 72 +++++++++++++++++++++++++++++++++++- tools/registry.py | 45 ++++++++++++++++++++++ 3 files changed, 118 insertions(+), 41 deletions(-) diff --git a/model_tools.py b/model_tools.py index 1924b2516..801255b79 100644 --- a/model_tools.py +++ b/model_tools.py @@ -26,7 +26,7 @@ import logging import threading from typing import Dict, Any, List, Optional, Tuple -from tools.registry import registry +from tools.registry import discover_builtin_tools, registry from toolsets import resolve_toolset, validate_toolset logger = logging.getLogger(__name__) @@ -129,45 +129,7 @@ def _run_async(coro): # Tool Discovery (importing each module triggers its registry.register calls) # ============================================================================= -def _discover_tools(): - """Import all tool modules to trigger their registry.register() calls. - - Wrapped in a function so import errors in optional tools (e.g., fal_client - not installed) don't prevent the rest from loading. - """ - _modules = [ - "tools.web_tools", - "tools.terminal_tool", - "tools.file_tools", - "tools.vision_tools", - "tools.mixture_of_agents_tool", - "tools.image_generation_tool", - "tools.skills_tool", - "tools.skill_manager_tool", - "tools.browser_tool", - "tools.cronjob_tools", - "tools.rl_training_tool", - "tools.tts_tool", - "tools.todo_tool", - "tools.memory_tool", - "tools.session_search_tool", - "tools.clarify_tool", - "tools.code_execution_tool", - "tools.delegate_tool", - "tools.process_registry", - "tools.send_message_tool", - # "tools.honcho_tools", # Removed — Honcho is now a memory provider plugin - "tools.homeassistant_tool", - ] - import importlib - for mod_name in _modules: - try: - importlib.import_module(mod_name) - except Exception as e: - logger.warning("Could not import tool module %s: %s", mod_name, e) - - -_discover_tools() +discover_builtin_tools() # MCP tool discovery (external MCP servers from config) try: diff --git a/tests/tools/test_registry.py b/tests/tools/test_registry.py index 6b2756886..85246bd76 100644 --- a/tests/tools/test_registry.py +++ b/tests/tools/test_registry.py @@ -2,8 +2,10 @@ import json import threading +from pathlib import Path +from unittest.mock import patch -from tools.registry import ToolRegistry +from tools.registry import ToolRegistry, discover_builtin_tools def _dummy_handler(args, **kwargs): @@ -286,6 +288,74 @@ class TestCheckFnExceptionHandling: assert any(u["name"] == "crashes" for u in unavailable) +class TestBuiltinDiscovery: + def test_matches_previous_manual_builtin_tool_set(self): + expected = { + "tools.browser_tool", + "tools.clarify_tool", + "tools.code_execution_tool", + "tools.cronjob_tools", + "tools.delegate_tool", + "tools.file_tools", + "tools.homeassistant_tool", + "tools.image_generation_tool", + "tools.memory_tool", + "tools.mixture_of_agents_tool", + "tools.process_registry", + "tools.rl_training_tool", + "tools.send_message_tool", + "tools.session_search_tool", + "tools.skill_manager_tool", + "tools.skills_tool", + "tools.terminal_tool", + "tools.todo_tool", + "tools.tts_tool", + "tools.vision_tools", + "tools.web_tools", + } + + with patch("tools.registry.importlib.import_module"): + imported = discover_builtin_tools(Path(__file__).resolve().parents[2] / "tools") + + assert set(imported) == expected + + def test_imports_only_self_registering_modules(self, tmp_path): + tools_dir = tmp_path / "tools" + tools_dir.mkdir() + (tools_dir / "__init__.py").write_text("", encoding="utf-8") + (tools_dir / "registry.py").write_text("", encoding="utf-8") + (tools_dir / "alpha.py").write_text( + "from tools.registry import registry\nregistry.register(name='alpha', toolset='x', schema={}, handler=lambda *_a, **_k: '{}')\n", + encoding="utf-8", + ) + (tools_dir / "beta.py").write_text("VALUE = 1\n", encoding="utf-8") + + with patch("tools.registry.importlib.import_module") as mock_import: + imported = discover_builtin_tools(tools_dir) + + assert imported == ["tools.alpha"] + mock_import.assert_called_once_with("tools.alpha") + + def test_skips_mcp_tool_even_if_it_registers(self, tmp_path): + tools_dir = tmp_path / "tools" + tools_dir.mkdir() + (tools_dir / "__init__.py").write_text("", encoding="utf-8") + (tools_dir / "mcp_tool.py").write_text( + "from tools.registry import registry\nregistry.register(name='mcp_alpha', toolset='mcp-test', schema={}, handler=lambda *_a, **_k: '{}')\n", + encoding="utf-8", + ) + (tools_dir / "alpha.py").write_text( + "from tools.registry import registry\nregistry.register(name='alpha', toolset='x', schema={}, handler=lambda *_a, **_k: '{}')\n", + encoding="utf-8", + ) + + with patch("tools.registry.importlib.import_module") as mock_import: + imported = discover_builtin_tools(tools_dir) + + assert imported == ["tools.alpha"] + mock_import.assert_called_once_with("tools.alpha") + + class TestEmojiMetadata: """Verify per-tool emoji registration and lookup.""" diff --git a/tools/registry.py b/tools/registry.py index ebda77807..53939047b 100644 --- a/tools/registry.py +++ b/tools/registry.py @@ -14,14 +14,59 @@ Import chain (circular-import safe): run_agent.py, cli.py, batch_runner.py, etc. """ +import ast +import importlib import json import logging import threading +from pathlib import Path from typing import Callable, Dict, List, Optional, Set logger = logging.getLogger(__name__) +def _module_registers_tools(module_path: Path) -> bool: + """Return True when the module contains a direct ``registry.register(...)`` call.""" + try: + source = module_path.read_text(encoding="utf-8") + tree = ast.parse(source, filename=str(module_path)) + except (OSError, SyntaxError): + return False + + for node in ast.walk(tree): + if not isinstance(node, ast.Call): + continue + func = node.func + if ( + isinstance(func, ast.Attribute) + and func.attr == "register" + and isinstance(func.value, ast.Name) + and func.value.id == "registry" + ): + return True + return False + + +def discover_builtin_tools(tools_dir: Optional[Path] = None) -> List[str]: + """Import built-in self-registering tool modules and return their module names.""" + tools_path = Path(tools_dir) if tools_dir is not None else Path(__file__).resolve().parent + module_names = [ + f"tools.{path.stem}" + for path in sorted(tools_path.glob("*.py")) + if path.name not in {"__init__.py", "registry.py", "mcp_tool.py"} + and _module_registers_tools(path) + ] + + imported: List[str] = [] + for mod_name in module_names: + try: + importlib.import_module(mod_name) + imported.append(mod_name) + except Exception as e: + logger.warning("Could not import tool module %s: %s", mod_name, e) + return imported + + class ToolEntry: """Metadata for a single registered tool.""" From fc6cb5b970f006dba448941ce5b3888fc36662fb Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 20:51:55 -0700 Subject: [PATCH 12/16] fix: tighten AST check to module-level only The original tree-wide ast.walk() would match registry.register() calls inside functions too. Restrict to top-level ast.Expr statements so helper modules that call registry.register() inside a function are never picked up as tool modules. --- tools/registry.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/tools/registry.py b/tools/registry.py index 53939047b..e6d554e2b 100644 --- a/tools/registry.py +++ b/tools/registry.py @@ -25,26 +25,32 @@ from typing import Callable, Dict, List, Optional, Set logger = logging.getLogger(__name__) +def _is_registry_register_call(node: ast.AST) -> bool: + """Return True when *node* is a ``registry.register(...)`` call expression.""" + if not isinstance(node, ast.Expr) or not isinstance(node.value, ast.Call): + return False + func = node.value.func + return ( + isinstance(func, ast.Attribute) + and func.attr == "register" + and isinstance(func.value, ast.Name) + and func.value.id == "registry" + ) + + def _module_registers_tools(module_path: Path) -> bool: - """Return True when the module contains a direct ``registry.register(...)`` call.""" + """Return True when the module contains a top-level ``registry.register(...)`` call. + + Only inspects module-body statements so that helper modules which happen + to call ``registry.register()`` inside a function are not picked up. + """ try: source = module_path.read_text(encoding="utf-8") tree = ast.parse(source, filename=str(module_path)) except (OSError, SyntaxError): return False - for node in ast.walk(tree): - if not isinstance(node, ast.Call): - continue - func = node.func - if ( - isinstance(func, ast.Attribute) - and func.attr == "register" - and isinstance(func.value, ast.Name) - and func.value.id == "registry" - ): - return True - return False + return any(_is_registry_register_call(stmt) for stmt in tree.body) def discover_builtin_tools(tools_dir: Optional[Path] = None) -> List[str]: From ef04de3e9851c1349e1a295eb3056557ab9e49e6 Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 21:03:34 -0700 Subject: [PATCH 13/16] docs: update tool-adding instructions for auto-discovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - AGENTS.md: 3 files → 2 files, remove _discover_tools() step - adding-tools.md: remove Step 3, note auto-discovery - architecture.md: update discovery description - tools-runtime.md: replace manual list with discover_builtin_tools() docs - hermes-agent skill: remove manual import step --- AGENTS.md | 8 ++-- .../hermes-agent/SKILL.md | 4 +- website/docs/developer-guide/adding-tools.md | 19 +++------- website/docs/developer-guide/architecture.md | 2 +- website/docs/developer-guide/tools-runtime.md | 38 ++++++------------- 5 files changed, 24 insertions(+), 47 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index e4b998f5e..c5757cc52 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -13,7 +13,7 @@ source venv/bin/activate # ALWAYS activate before running Python ``` hermes-agent/ ├── run_agent.py # AIAgent class — core conversation loop -├── model_tools.py # Tool orchestration, _discover_tools(), handle_function_call() +├── model_tools.py # Tool orchestration, discover_builtin_tools(), handle_function_call() ├── toolsets.py # Toolset definitions, _HERMES_CORE_TOOLS list ├── cli.py # HermesCLI class — interactive CLI orchestrator ├── hermes_state.py # SessionDB — SQLite session store (FTS5 search) @@ -181,7 +181,7 @@ if canonical == "mycommand": ## Adding New Tools -Requires changes in **3 files**: +Requires changes in **2 files**: **1. Create `tools/your_tool.py`:** ```python @@ -204,9 +204,9 @@ registry.register( ) ``` -**2. Add import** in `model_tools.py` `_discover_tools()` list. +**2. Add to `toolsets.py`** — either `_HERMES_CORE_TOOLS` (all platforms) or a new toolset. -**3. Add to `toolsets.py`** — either `_HERMES_CORE_TOOLS` (all platforms) or a new toolset. +Auto-discovery: any `tools/*.py` file with a top-level `registry.register()` call is imported automatically — no manual import list to maintain. The registry handles schema collection, dispatch, availability checking, and error wrapping. All handlers MUST return a JSON string. diff --git a/skills/autonomous-ai-agents/hermes-agent/SKILL.md b/skills/autonomous-ai-agents/hermes-agent/SKILL.md index 9e0b412f5..77e1b1d18 100644 --- a/skills/autonomous-ai-agents/hermes-agent/SKILL.md +++ b/skills/autonomous-ai-agents/hermes-agent/SKILL.md @@ -650,9 +650,9 @@ registry.register( ) ``` -**2. Add import** in `model_tools.py` → `_discover_tools()` list. +**2. Add to `toolsets.py`** → `_HERMES_CORE_TOOLS` list. -**3. Add to `toolsets.py`** → `_HERMES_CORE_TOOLS` list. +Auto-discovery: any `tools/*.py` file with a top-level `registry.register()` call is imported automatically — no manual list needed. All handlers must return JSON strings. Use `get_hermes_home()` for paths, never hardcode `~/.hermes`. diff --git a/website/docs/developer-guide/adding-tools.md b/website/docs/developer-guide/adding-tools.md index 76f8477e3..497202bfc 100644 --- a/website/docs/developer-guide/adding-tools.md +++ b/website/docs/developer-guide/adding-tools.md @@ -14,11 +14,12 @@ Make it a **Tool** when it requires end-to-end integration with API keys, custom ## Overview -Adding a tool touches **3 files**: +Adding a tool touches **2 files**: 1. **`tools/your_tool.py`** — handler, schema, check function, `registry.register()` call 2. **`toolsets.py`** — add tool name to `_HERMES_CORE_TOOLS` (or a specific toolset) -3. **`model_tools.py`** — add `"tools.your_tool"` to the `_discover_tools()` list + +Any `tools/*.py` file with a top-level `registry.register()` call is auto-discovered at startup — no manual import list required. ## Step 1: Create the Tool File @@ -124,19 +125,9 @@ _HERMES_CORE_TOOLS = [ }, ``` -## Step 3: Add Discovery Import +## ~~Step 3: Add Discovery Import~~ (No longer needed) -In `model_tools.py`, add the module to the `_discover_tools()` list: - -```python -def _discover_tools(): - _modules = [ - ... - "tools.weather_tool", # <-- add here - ] -``` - -This import triggers the `registry.register()` call at the bottom of your tool file. +Tool modules with a top-level `registry.register()` call are auto-discovered by `discover_builtin_tools()` in `tools/registry.py`. No manual import list to maintain — just create your file in `tools/` and it's picked up at startup. ## Async Handlers diff --git a/website/docs/developer-guide/architecture.md b/website/docs/developer-guide/architecture.md index eec24815b..5b881c7e2 100644 --- a/website/docs/developer-guide/architecture.md +++ b/website/docs/developer-guide/architecture.md @@ -275,4 +275,4 @@ model_tools.py (imports tools/registry + triggers tool discovery) run_agent.py, cli.py, batch_runner.py, environments/ ``` -This chain means tool registration happens at import time, before any agent instance is created. Adding a new tool requires an import in `model_tools.py`'s `_discover_tools()` list. +This chain means tool registration happens at import time, before any agent instance is created. Any `tools/*.py` file with a top-level `registry.register()` call is auto-discovered — no manual import list needed. diff --git a/website/docs/developer-guide/tools-runtime.md b/website/docs/developer-guide/tools-runtime.md index 8e349a505..851ad6bc9 100644 --- a/website/docs/developer-guide/tools-runtime.md +++ b/website/docs/developer-guide/tools-runtime.md @@ -42,37 +42,23 @@ registry.register( Each call creates a `ToolEntry` stored in the singleton `ToolRegistry._tools` dict keyed by tool name. If a name collision occurs across toolsets, a warning is logged and the later registration wins. -### Discovery: `_discover_tools()` +### Discovery: `discover_builtin_tools()` -When `model_tools.py` is imported, it calls `_discover_tools()` which imports every tool module in order: +When `model_tools.py` is imported, it calls `discover_builtin_tools()` from `tools/registry.py`. This function scans every `tools/*.py` file using AST parsing to find modules that contain top-level `registry.register()` calls, then imports them: ```python -_modules = [ - "tools.web_tools", - "tools.terminal_tool", - "tools.file_tools", - "tools.vision_tools", - "tools.mixture_of_agents_tool", - "tools.image_generation_tool", - "tools.skills_tool", - "tools.skill_manager_tool", - "tools.browser_tool", - "tools.cronjob_tools", - "tools.rl_training_tool", - "tools.tts_tool", - "tools.todo_tool", - "tools.memory_tool", - "tools.session_search_tool", - "tools.clarify_tool", - "tools.code_execution_tool", - "tools.delegate_tool", - "tools.process_registry", - "tools.send_message_tool", - # "tools.honcho_tools", # Removed — Honcho is now a memory provider plugin - "tools.homeassistant_tool", -] +# tools/registry.py (simplified) +def discover_builtin_tools(tools_dir=None): + tools_path = Path(tools_dir) if tools_dir else Path(__file__).parent + for path in sorted(tools_path.glob("*.py")): + if path.name in {"__init__.py", "registry.py", "mcp_tool.py"}: + continue + if _module_registers_tools(path): # AST check for top-level registry.register() + importlib.import_module(f"tools.{path.stem}") ``` +This auto-discovery means new tool files are picked up automatically — no manual list to maintain. The AST check only matches top-level `registry.register()` calls (not calls inside functions), so helper modules in `tools/` are not imported. + Each import triggers the module's `registry.register()` calls. Errors in optional tools (e.g., missing `fal_client` for image generation) are caught and logged — they don't prevent other tools from loading. After core tool discovery, MCP tools and plugin tools are also discovered: From ba24f058ed34f5d6531246a87904601225c302b1 Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 21:06:00 -0700 Subject: [PATCH 14/16] docs: fix stale docstring reference to _discover_tools in mcp_tool.py --- tools/mcp_tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/mcp_tool.py b/tools/mcp_tool.py index fa8b945ca..50655fa38 100644 --- a/tools/mcp_tool.py +++ b/tools/mcp_tool.py @@ -2036,7 +2036,7 @@ def register_mcp_servers(servers: Dict[str, dict]) -> List[str]: def discover_mcp_tools() -> List[str]: """Entry point: load config, connect to MCP servers, register tools. - Called from ``model_tools._discover_tools()``. Safe to call even when + Called from ``model_tools`` after ``discover_builtin_tools()``. Safe to call even when the ``mcp`` package is not installed (returns empty list). Idempotent for already-connected servers. If some servers failed on a From c5688e7c8ba4f46a0cbfad0b0be3a5ac5616350b Mon Sep 17 00:00:00 2001 From: Teknium Date: Tue, 14 Apr 2026 21:15:12 -0700 Subject: [PATCH 15/16] fix(gateway): break compression-exhaustion infinite loop and auto-reset session (#9893) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When compression fails after max attempts, the agent returns {completed: False, partial: True} but was missing the 'failed' flag. The gateway's agent_failed_early guard checked for 'failed' AND 'not final_response', but _run_agent_blocking always converts errors to final_response — making the guard dead code. This caused the oversized session to persist, creating an infinite fail loop where every subsequent message hits the same compression failure. Changes: - run_agent.py: add 'failed: True' and 'compression_exhausted: True' to all 5 compression-exhaustion return paths - gateway/run.py (_run_agent_blocking): forward 'failed' and 'compression_exhausted' flags through to the caller - gateway/run.py (_handle_message_with_agent): fix agent_failed_early to check bool(failed) without the broken 'not final_response' clause; auto-reset the session when compression is exhausted so the next message starts fresh - Update tests to match new guard logic and add TestCompressionExhaustedFlag test class Closes #9893 --- gateway/run.py | 31 +++++++--- run_agent.py | 20 ++++-- .../test_1630_context_overflow_loop.py | 62 +++++++++++++------ 3 files changed, 82 insertions(+), 31 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index c59e9dff1..1bef295c3 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3898,14 +3898,11 @@ class GatewayRunner: # intermediate reasoning) so sessions can be resumed with full context # and transcripts are useful for debugging and training data. # - # IMPORTANT: When the agent failed before producing any response - # (e.g. context-overflow 400), do NOT persist the user's message. + # IMPORTANT: When the agent failed (e.g. context-overflow 400, + # compression exhausted), do NOT persist the user's message. # Persisting it would make the session even larger, causing the - # same failure on the next attempt — an infinite loop. (#1630) - agent_failed_early = ( - agent_result.get("failed") - and not agent_result.get("final_response") - ) + # same failure on the next attempt — an infinite loop. (#1630, #9893) + agent_failed_early = bool(agent_result.get("failed")) if agent_failed_early: logger.info( "Skipping transcript persistence for failed request in " @@ -3913,6 +3910,24 @@ class GatewayRunner: session_entry.session_id, ) + # When compression is exhausted, the session is permanently too + # large to process. Auto-reset it so the next message starts + # fresh instead of replaying the same oversized context in an + # infinite fail loop. (#9893) + if agent_result.get("compression_exhausted") and session_entry and session_key: + logger.info( + "Auto-resetting session %s after compression exhaustion.", + session_entry.session_id, + ) + self.session_store.reset_session(session_key) + self._evict_cached_agent(session_key) + self._session_model_overrides.pop(session_key, None) + response = (response or "") + ( + "\n\n🔄 Session auto-reset — the conversation exceeded the " + "maximum context size and could not be compressed further. " + "Your next message will start a fresh session." + ) + ts = datetime.now().isoformat() # If this is a fresh session (no history), write the full tool @@ -8626,6 +8641,8 @@ class GatewayRunner: "final_response": error_msg, "messages": result.get("messages", []), "api_calls": result.get("api_calls", 0), + "failed": result.get("failed", False), + "compression_exhausted": result.get("compression_exhausted", False), "tools": tools_holder[0] or [], "history_offset": len(agent_history), "last_prompt_tokens": _last_prompt_toks, diff --git a/run_agent.py b/run_agent.py index 0814a8b49..080156051 100644 --- a/run_agent.py +++ b/run_agent.py @@ -9312,7 +9312,9 @@ class AIAgent: "completed": False, "api_calls": api_call_count, "error": f"Request payload too large: max compression attempts ({max_compression_attempts}) reached.", - "partial": True + "partial": True, + "failed": True, + "compression_exhausted": True, } self._emit_status(f"⚠️ Request payload too large (413) — compression attempt {compression_attempts}/{max_compression_attempts}...") @@ -9341,7 +9343,9 @@ class AIAgent: "completed": False, "api_calls": api_call_count, "error": "Request payload too large (413). Cannot compress further.", - "partial": True + "partial": True, + "failed": True, + "compression_exhausted": True, } # Check for context-length errors BEFORE generic 4xx handler. @@ -9392,7 +9396,9 @@ class AIAgent: "completed": False, "api_calls": api_call_count, "error": f"Context length exceeded: max compression attempts ({max_compression_attempts}) reached.", - "partial": True + "partial": True, + "failed": True, + "compression_exhausted": True, } restart_with_compressed_messages = True break @@ -9442,7 +9448,9 @@ class AIAgent: "completed": False, "api_calls": api_call_count, "error": f"Context length exceeded: max compression attempts ({max_compression_attempts}) reached.", - "partial": True + "partial": True, + "failed": True, + "compression_exhausted": True, } self._emit_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...") @@ -9473,7 +9481,9 @@ class AIAgent: "completed": False, "api_calls": api_call_count, "error": f"Context length exceeded ({approx_tokens:,} tokens). Cannot compress further.", - "partial": True + "partial": True, + "failed": True, + "compression_exhausted": True, } # Check for non-retryable client errors. The classifier diff --git a/tests/run_agent/test_1630_context_overflow_loop.py b/tests/run_agent/test_1630_context_overflow_loop.py index d087fee4f..c33aaa967 100644 --- a/tests/run_agent/test_1630_context_overflow_loop.py +++ b/tests/run_agent/test_1630_context_overflow_loop.py @@ -136,33 +136,29 @@ class TestGatewaySkipsPersistenceOnFailure: the gateway should NOT persist messages to the transcript.""" def test_agent_failed_early_detected(self): - """The agent_failed_early flag is True when failed=True and - no final_response.""" + """The agent_failed_early flag is True when failed=True, + regardless of final_response.""" agent_result = { "failed": True, "final_response": None, "messages": [], "error": "Non-retryable client error", } - agent_failed_early = ( - agent_result.get("failed") - and not agent_result.get("final_response") - ) + agent_failed_early = bool(agent_result.get("failed")) assert agent_failed_early - def test_agent_with_response_not_failed_early(self): - """When the agent has a final_response, it's not a failed-early - scenario even if failed=True.""" + def test_agent_failed_with_error_response_still_detected(self): + """When _run_agent_blocking converts an error to final_response, + the failed flag should still trigger agent_failed_early. This + was the core bug in #9893 — the old guard checked + ``not final_response`` which was always truthy after conversion.""" agent_result = { "failed": True, - "final_response": "Here is a partial response", + "final_response": "⚠️ Request payload too large: max compression attempts reached.", "messages": [], } - agent_failed_early = ( - agent_result.get("failed") - and not agent_result.get("final_response") - ) - assert not agent_failed_early + agent_failed_early = bool(agent_result.get("failed")) + assert agent_failed_early def test_successful_agent_not_failed_early(self): """A successful agent result should not trigger skip.""" @@ -170,13 +166,41 @@ class TestGatewaySkipsPersistenceOnFailure: "final_response": "Hello!", "messages": [{"role": "assistant", "content": "Hello!"}], } - agent_failed_early = ( - agent_result.get("failed") - and not agent_result.get("final_response") - ) + agent_failed_early = bool(agent_result.get("failed")) assert not agent_failed_early +class TestCompressionExhaustedFlag: + """When compression is exhausted, the agent should set both + failed=True and compression_exhausted=True so the gateway can + auto-reset the session. (#9893)""" + + def test_compression_exhausted_returns_carry_flag(self): + """Simulate the return dict from a compression-exhausted agent.""" + agent_result = { + "messages": [], + "completed": False, + "api_calls": 3, + "error": "Request payload too large: max compression attempts (3) reached.", + "partial": True, + "failed": True, + "compression_exhausted": True, + } + assert agent_result.get("failed") + assert agent_result.get("compression_exhausted") + + def test_normal_failure_not_compression_exhausted(self): + """Non-compression failures should not have compression_exhausted.""" + agent_result = { + "messages": [], + "completed": False, + "failed": True, + "error": "Invalid API response after 3 retries", + } + assert agent_result.get("failed") + assert not agent_result.get("compression_exhausted") + + # --------------------------------------------------------------------------- # Test 3: Context-overflow error messages # --------------------------------------------------------------------------- From 8548893d14724b8f1e1e74ca9315a86fc3ee8c08 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:20:37 -0700 Subject: [PATCH 16/16] =?UTF-8?q?feat:=20entry-level=20Podman=20support=20?= =?UTF-8?q?=E2=80=94=20find=5Fdocker()=20+=20rootless=20entrypoint=20(#100?= =?UTF-8?q?66)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - find_docker() now checks HERMES_DOCKER_BINARY env var first, then docker on PATH, then podman on PATH, then macOS known locations - Entrypoint respects HERMES_HOME env var (was hardcoded to /opt/data) - Entrypoint uses groupmod -o to tolerate non-unique GIDs (fixes macOS GID 20 conflict with Debian's dialout group) - Entrypoint makes chown best-effort so rootless Podman continues instead of failing with 'Operation not permitted' - 5 new tests covering env var override, podman fallback, precedence Based on work by alanjds (PR #3996) and malaiwah (PR #8115). Closes #4084. --- .env.example | 4 +++ docker/entrypoint.sh | 19 +++++++---- tests/tools/test_docker_find.py | 56 +++++++++++++++++++++++++++++++++ tools/environments/docker.py | 28 ++++++++++++++--- 4 files changed, 96 insertions(+), 11 deletions(-) mode change 100644 => 100755 docker/entrypoint.sh diff --git a/.env.example b/.env.example index 0317296ba..76be6ce26 100644 --- a/.env.example +++ b/.env.example @@ -145,6 +145,10 @@ # Only override here if you need to force a backend without touching config.yaml: # TERMINAL_ENV=local +# Override the container runtime binary (e.g. to use Podman instead of Docker). +# Useful on systems where Docker's storage driver is broken or unavailable. +# HERMES_DOCKER_BINARY=/usr/local/bin/podman + # Container images (for singularity/docker/modal backends) # TERMINAL_DOCKER_IMAGE=nikolaik/python-nodejs:python3.11-nodejs20 # TERMINAL_SINGULARITY_IMAGE=docker://nikolaik/python-nodejs:python3.11-nodejs20 diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh old mode 100644 new mode 100755 index dc1edd32c..c46497dcc --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -1,13 +1,14 @@ #!/bin/bash -# Docker entrypoint: bootstrap config files into the mounted volume, then run hermes. +# Docker/Podman entrypoint: bootstrap config files into the mounted volume, then run hermes. set -e -HERMES_HOME="/opt/data" +HERMES_HOME="${HERMES_HOME:-/opt/data}" INSTALL_DIR="/opt/hermes" # --- Privilege dropping via gosu --- -# When started as root (the default), optionally remap the hermes user/group -# to match host-side ownership, fix volume permissions, then re-exec as hermes. +# When started as root (the default for Docker, or fakeroot in rootless Podman), +# optionally remap the hermes user/group to match host-side ownership, fix volume +# permissions, then re-exec as hermes. if [ "$(id -u)" = "0" ]; then if [ -n "$HERMES_UID" ] && [ "$HERMES_UID" != "$(id -u hermes)" ]; then echo "Changing hermes UID to $HERMES_UID" @@ -16,13 +17,19 @@ if [ "$(id -u)" = "0" ]; then if [ -n "$HERMES_GID" ] && [ "$HERMES_GID" != "$(id -g hermes)" ]; then echo "Changing hermes GID to $HERMES_GID" - groupmod -g "$HERMES_GID" hermes + # -o allows non-unique GID (e.g. macOS GID 20 "staff" may already exist + # as "dialout" in the Debian-based container image) + groupmod -o -g "$HERMES_GID" hermes 2>/dev/null || true fi actual_hermes_uid=$(id -u hermes) if [ "$(stat -c %u "$HERMES_HOME" 2>/dev/null)" != "$actual_hermes_uid" ]; then echo "$HERMES_HOME is not owned by $actual_hermes_uid, fixing" - chown -R hermes:hermes "$HERMES_HOME" + # In rootless Podman the container's "root" is mapped to an unprivileged + # host UID — chown will fail. That's fine: the volume is already owned + # by the mapped user on the host side. + chown -R hermes:hermes "$HERMES_HOME" 2>/dev/null || \ + echo "Warning: chown failed (rootless container?) — continuing anyway" fi echo "Dropping root privileges" diff --git a/tests/tools/test_docker_find.py b/tests/tools/test_docker_find.py index c1fb58a3e..0cf9c3208 100644 --- a/tests/tools/test_docker_find.py +++ b/tests/tools/test_docker_find.py @@ -46,3 +46,59 @@ class TestFindDocker: with patch("tools.environments.docker.shutil.which", return_value=None): second = docker_mod.find_docker() assert first == second == "/usr/local/bin/docker" + + def test_env_var_override_takes_precedence(self, tmp_path): + """HERMES_DOCKER_BINARY overrides PATH and known-location discovery.""" + fake_binary = tmp_path / "podman" + fake_binary.write_text("#!/bin/sh\n") + fake_binary.chmod(0o755) + + with patch.dict(os.environ, {"HERMES_DOCKER_BINARY": str(fake_binary)}), \ + patch("tools.environments.docker.shutil.which", return_value="/usr/bin/docker"): + result = docker_mod.find_docker() + assert result == str(fake_binary) + + def test_env_var_override_ignored_if_not_executable(self, tmp_path): + """Non-executable HERMES_DOCKER_BINARY falls through to normal discovery.""" + fake_binary = tmp_path / "podman" + fake_binary.write_text("#!/bin/sh\n") + fake_binary.chmod(0o644) # not executable + + with patch.dict(os.environ, {"HERMES_DOCKER_BINARY": str(fake_binary)}), \ + patch("tools.environments.docker.shutil.which", return_value="/usr/bin/docker"): + result = docker_mod.find_docker() + assert result == "/usr/bin/docker" + + def test_env_var_override_ignored_if_nonexistent(self): + """Non-existent HERMES_DOCKER_BINARY path falls through.""" + with patch.dict(os.environ, {"HERMES_DOCKER_BINARY": "/nonexistent/podman"}), \ + patch("tools.environments.docker.shutil.which", return_value="/usr/bin/docker"): + result = docker_mod.find_docker() + assert result == "/usr/bin/docker" + + def test_podman_on_path_used_when_docker_missing(self): + """When docker is not on PATH, podman is tried next.""" + def which_side_effect(name): + if name == "docker": + return None + if name == "podman": + return "/usr/bin/podman" + return None + + with patch("tools.environments.docker.shutil.which", side_effect=which_side_effect), \ + patch("tools.environments.docker._DOCKER_SEARCH_PATHS", []): + result = docker_mod.find_docker() + assert result == "/usr/bin/podman" + + def test_docker_preferred_over_podman(self): + """When both docker and podman are on PATH, docker wins.""" + def which_side_effect(name): + if name == "docker": + return "/usr/bin/docker" + if name == "podman": + return "/usr/bin/podman" + return None + + with patch("tools.environments.docker.shutil.which", side_effect=which_side_effect): + result = docker_mod.find_docker() + assert result == "/usr/bin/docker" diff --git a/tools/environments/docker.py b/tools/environments/docker.py index 2341778f4..d2ea5c964 100644 --- a/tools/environments/docker.py +++ b/tools/environments/docker.py @@ -99,23 +99,41 @@ def _load_hermes_env_vars() -> dict[str, str]: def find_docker() -> Optional[str]: - """Locate the docker CLI binary. + """Locate the docker (or podman) CLI binary. - Checks ``shutil.which`` first (respects PATH), then probes well-known - install locations on macOS where Docker Desktop may not be in PATH - (e.g. when running as a gateway service via launchd). + Resolution order: + 1. ``HERMES_DOCKER_BINARY`` env var — explicit override (e.g. ``/usr/bin/podman``) + 2. ``docker`` on PATH via ``shutil.which`` + 3. ``podman`` on PATH via ``shutil.which`` + 4. Well-known macOS Docker Desktop install locations - Returns the absolute path, or ``None`` if docker cannot be found. + Returns the absolute path, or ``None`` if neither runtime can be found. """ global _docker_executable if _docker_executable is not None: return _docker_executable + # 1. Explicit override via env var (e.g. for Podman on immutable distros) + override = os.getenv("HERMES_DOCKER_BINARY") + if override and os.path.isfile(override) and os.access(override, os.X_OK): + _docker_executable = override + logger.info("Using HERMES_DOCKER_BINARY override: %s", override) + return override + + # 2. docker on PATH found = shutil.which("docker") if found: _docker_executable = found return found + # 3. podman on PATH (drop-in compatible for our use case) + found = shutil.which("podman") + if found: + _docker_executable = found + logger.info("Using podman as container runtime: %s", found) + return found + + # 4. Well-known macOS Docker Desktop locations for path in _DOCKER_SEARCH_PATHS: if os.path.isfile(path) and os.access(path, os.X_OK): _docker_executable = path