Merge branch 'main' of github.com:NousResearch/hermes-agent into feat/ink-refactor

This commit is contained in:
Brooklyn Nicholson 2026-04-15 00:02:31 -05:00
commit 561cea0d4a
23 changed files with 1082 additions and 149 deletions

View file

@ -145,6 +145,10 @@
# Only override here if you need to force a backend without touching config.yaml:
# TERMINAL_ENV=local
# Override the container runtime binary (e.g. to use Podman instead of Docker).
# Useful on systems where Docker's storage driver is broken or unavailable.
# HERMES_DOCKER_BINARY=/usr/local/bin/podman
# Container images (for singularity/docker/modal backends)
# TERMINAL_DOCKER_IMAGE=nikolaik/python-nodejs:python3.11-nodejs20
# TERMINAL_SINGULARITY_IMAGE=docker://nikolaik/python-nodejs:python3.11-nodejs20

View file

@ -13,7 +13,7 @@ source venv/bin/activate # ALWAYS activate before running Python
```
hermes-agent/
├── run_agent.py # AIAgent class — core conversation loop
├── model_tools.py # Tool orchestration, _discover_tools(), handle_function_call()
├── model_tools.py # Tool orchestration, discover_builtin_tools(), handle_function_call()
├── toolsets.py # Toolset definitions, _HERMES_CORE_TOOLS list
├── cli.py # HermesCLI class — interactive CLI orchestrator
├── hermes_state.py # SessionDB — SQLite session store (FTS5 search)
@ -245,7 +245,7 @@ npm test # vitest
## Adding New Tools
Requires changes in **3 files**:
Requires changes in **2 files**:
**1. Create `tools/your_tool.py`:**
```python
@ -268,9 +268,9 @@ registry.register(
)
```
**2. Add import** in `model_tools.py` `_discover_tools()` list.
**2. Add to `toolsets.py`** — either `_HERMES_CORE_TOOLS` (all platforms) or a new toolset.
**3. Add to `toolsets.py`** — either `_HERMES_CORE_TOOLS` (all platforms) or a new toolset.
Auto-discovery: any `tools/*.py` file with a top-level `registry.register()` call is imported automatically — no manual import list to maintain.
The registry handles schema collection, dispatch, availability checking, and error wrapping. All handlers MUST return a JSON string.

19
docker/entrypoint.sh Normal file → Executable file
View file

@ -1,13 +1,14 @@
#!/bin/bash
# Docker entrypoint: bootstrap config files into the mounted volume, then run hermes.
# Docker/Podman entrypoint: bootstrap config files into the mounted volume, then run hermes.
set -e
HERMES_HOME="/opt/data"
HERMES_HOME="${HERMES_HOME:-/opt/data}"
INSTALL_DIR="/opt/hermes"
# --- Privilege dropping via gosu ---
# When started as root (the default), optionally remap the hermes user/group
# to match host-side ownership, fix volume permissions, then re-exec as hermes.
# When started as root (the default for Docker, or fakeroot in rootless Podman),
# optionally remap the hermes user/group to match host-side ownership, fix volume
# permissions, then re-exec as hermes.
if [ "$(id -u)" = "0" ]; then
if [ -n "$HERMES_UID" ] && [ "$HERMES_UID" != "$(id -u hermes)" ]; then
echo "Changing hermes UID to $HERMES_UID"
@ -16,13 +17,19 @@ if [ "$(id -u)" = "0" ]; then
if [ -n "$HERMES_GID" ] && [ "$HERMES_GID" != "$(id -g hermes)" ]; then
echo "Changing hermes GID to $HERMES_GID"
groupmod -g "$HERMES_GID" hermes
# -o allows non-unique GID (e.g. macOS GID 20 "staff" may already exist
# as "dialout" in the Debian-based container image)
groupmod -o -g "$HERMES_GID" hermes 2>/dev/null || true
fi
actual_hermes_uid=$(id -u hermes)
if [ "$(stat -c %u "$HERMES_HOME" 2>/dev/null)" != "$actual_hermes_uid" ]; then
echo "$HERMES_HOME is not owned by $actual_hermes_uid, fixing"
chown -R hermes:hermes "$HERMES_HOME"
# In rootless Podman the container's "root" is mapped to an unprivileged
# host UID — chown will fail. That's fine: the volume is already owned
# by the mapped user on the host side.
chown -R hermes:hermes "$HERMES_HOME" 2>/dev/null || \
echo "Warning: chown failed (rootless container?) — continuing anyway"
fi
echo "Dropping root privileges"

View file

@ -515,6 +515,8 @@ class APIServerAdapter(BasePlatformAdapter):
session_id: Optional[str] = None,
stream_delta_callback=None,
tool_progress_callback=None,
tool_start_callback=None,
tool_complete_callback=None,
) -> Any:
"""
Create an AIAgent instance using the gateway's runtime config.
@ -553,6 +555,8 @@ class APIServerAdapter(BasePlatformAdapter):
platform="api_server",
stream_delta_callback=stream_delta_callback,
tool_progress_callback=tool_progress_callback,
tool_start_callback=tool_start_callback,
tool_complete_callback=tool_complete_callback,
session_db=self._ensure_session_db(),
fallback_model=fallback_model,
)
@ -965,6 +969,427 @@ class APIServerAdapter(BasePlatformAdapter):
return response
async def _write_sse_responses(
self,
request: "web.Request",
response_id: str,
model: str,
created_at: int,
stream_q,
agent_task,
agent_ref,
conversation_history: List[Dict[str, str]],
user_message: str,
instructions: Optional[str],
conversation: Optional[str],
store: bool,
session_id: str,
) -> "web.StreamResponse":
"""Write an SSE stream for POST /v1/responses (OpenAI Responses API).
Emits spec-compliant event types as the agent runs:
- ``response.created`` initial envelope (status=in_progress)
- ``response.output_text.delta`` / ``response.output_text.done``
streamed assistant text
- ``response.output_item.added`` / ``response.output_item.done``
with ``item.type == "function_call"`` when the agent invokes a
tool (both events fire; the ``done`` event carries the finalized
``arguments`` string)
- ``response.output_item.added`` with
``item.type == "function_call_output"`` tool result with
``{call_id, output, status}``
- ``response.completed`` terminal event carrying the full
response object with all output items + usage (same payload
shape as the non-streaming path for parity)
- ``response.failed`` terminal event on agent error
If the client disconnects mid-stream, ``agent.interrupt()`` is
called so the agent stops issuing upstream LLM calls, then the
asyncio task is cancelled. When ``store=True`` the full response
is persisted to the ResponseStore in a ``finally`` block so GET
/v1/responses/{id} and ``previous_response_id`` chaining work the
same as the batch path.
"""
import queue as _q
sse_headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
}
origin = request.headers.get("Origin", "")
cors = self._cors_headers_for_origin(origin) if origin else None
if cors:
sse_headers.update(cors)
if session_id:
sse_headers["X-Hermes-Session-Id"] = session_id
response = web.StreamResponse(status=200, headers=sse_headers)
await response.prepare(request)
# State accumulated during the stream
final_text_parts: List[str] = []
# Track open function_call items by name so we can emit a matching
# ``done`` event when the tool completes. Order preserved.
pending_tool_calls: List[Dict[str, Any]] = []
# Output items we've emitted so far (used to build the terminal
# response.completed payload). Kept in the order they appeared.
emitted_items: List[Dict[str, Any]] = []
# Monotonic counter for output_index (spec requires it).
output_index = 0
# Monotonic counter for call_id generation if the agent doesn't
# provide one (it doesn't, from tool_progress_callback).
call_counter = 0
# Canonical Responses SSE events include a monotonically increasing
# sequence_number. Add it server-side for every emitted event so
# clients that validate the OpenAI event schema can parse our stream.
sequence_number = 0
# Track the assistant message item id + content index for text
# delta events — the spec ties deltas to a specific item.
message_item_id = f"msg_{uuid.uuid4().hex[:24]}"
message_output_index: Optional[int] = None
message_opened = False
async def _write_event(event_type: str, data: Dict[str, Any]) -> None:
nonlocal sequence_number
if "sequence_number" not in data:
data["sequence_number"] = sequence_number
sequence_number += 1
payload = f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
await response.write(payload.encode())
def _envelope(status: str) -> Dict[str, Any]:
env: Dict[str, Any] = {
"id": response_id,
"object": "response",
"status": status,
"created_at": created_at,
"model": model,
}
return env
final_response_text = ""
agent_error: Optional[str] = None
usage: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
try:
# response.created — initial envelope, status=in_progress
created_env = _envelope("in_progress")
created_env["output"] = []
await _write_event("response.created", {
"type": "response.created",
"response": created_env,
})
last_activity = time.monotonic()
async def _open_message_item() -> None:
"""Emit response.output_item.added for the assistant message
the first time any text delta arrives."""
nonlocal message_opened, message_output_index, output_index
if message_opened:
return
message_opened = True
message_output_index = output_index
output_index += 1
item = {
"id": message_item_id,
"type": "message",
"status": "in_progress",
"role": "assistant",
"content": [],
}
await _write_event("response.output_item.added", {
"type": "response.output_item.added",
"output_index": message_output_index,
"item": item,
})
async def _emit_text_delta(delta_text: str) -> None:
await _open_message_item()
final_text_parts.append(delta_text)
await _write_event("response.output_text.delta", {
"type": "response.output_text.delta",
"item_id": message_item_id,
"output_index": message_output_index,
"content_index": 0,
"delta": delta_text,
"logprobs": [],
})
async def _emit_tool_started(payload: Dict[str, Any]) -> str:
"""Emit response.output_item.added for a function_call.
Returns the call_id so the matching completion event can
reference it. Prefer the real ``tool_call_id`` from the
agent when available; fall back to a generated call id for
safety in tests or older code paths.
"""
nonlocal output_index, call_counter
call_counter += 1
call_id = payload.get("tool_call_id") or f"call_{response_id[5:]}_{call_counter}"
args = payload.get("arguments", {})
if isinstance(args, dict):
arguments_str = json.dumps(args)
else:
arguments_str = str(args)
item = {
"id": f"fc_{uuid.uuid4().hex[:24]}",
"type": "function_call",
"status": "in_progress",
"name": payload.get("name", ""),
"call_id": call_id,
"arguments": arguments_str,
}
idx = output_index
output_index += 1
pending_tool_calls.append({
"call_id": call_id,
"name": payload.get("name", ""),
"arguments": arguments_str,
"item_id": item["id"],
"output_index": idx,
})
emitted_items.append({
"type": "function_call",
"name": payload.get("name", ""),
"arguments": arguments_str,
"call_id": call_id,
})
await _write_event("response.output_item.added", {
"type": "response.output_item.added",
"output_index": idx,
"item": item,
})
return call_id
async def _emit_tool_completed(payload: Dict[str, Any]) -> None:
"""Emit response.output_item.done (function_call) followed
by response.output_item.added (function_call_output)."""
nonlocal output_index
call_id = payload.get("tool_call_id")
result = payload.get("result", "")
pending = None
if call_id:
for i, p in enumerate(pending_tool_calls):
if p["call_id"] == call_id:
pending = pending_tool_calls.pop(i)
break
if pending is None:
# Completion without a matching start — skip to avoid
# emitting orphaned done events.
return
# function_call done
done_item = {
"id": pending["item_id"],
"type": "function_call",
"status": "completed",
"name": pending["name"],
"call_id": pending["call_id"],
"arguments": pending["arguments"],
}
await _write_event("response.output_item.done", {
"type": "response.output_item.done",
"output_index": pending["output_index"],
"item": done_item,
})
# function_call_output added (result)
result_str = result if isinstance(result, str) else json.dumps(result)
output_parts = [{"type": "input_text", "text": result_str}]
output_item = {
"id": f"fco_{uuid.uuid4().hex[:24]}",
"type": "function_call_output",
"call_id": pending["call_id"],
"output": output_parts,
"status": "completed",
}
idx = output_index
output_index += 1
emitted_items.append({
"type": "function_call_output",
"call_id": pending["call_id"],
"output": output_parts,
})
await _write_event("response.output_item.added", {
"type": "response.output_item.added",
"output_index": idx,
"item": output_item,
})
await _write_event("response.output_item.done", {
"type": "response.output_item.done",
"output_index": idx,
"item": output_item,
})
# Main drain loop — thread-safe queue fed by agent callbacks.
async def _dispatch(it) -> None:
"""Route a queue item to the correct SSE emitter.
Plain strings are text deltas. Tagged tuples with
``__tool_started__`` / ``__tool_completed__`` prefixes
are tool lifecycle events.
"""
if isinstance(it, tuple) and len(it) == 2 and isinstance(it[0], str):
tag, payload = it
if tag == "__tool_started__":
await _emit_tool_started(payload)
elif tag == "__tool_completed__":
await _emit_tool_completed(payload)
# Unknown tags are silently ignored (forward-compat).
elif isinstance(it, str):
await _emit_text_delta(it)
# Other types (non-string, non-tuple) are silently dropped.
loop = asyncio.get_event_loop()
while True:
try:
item = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
except _q.Empty:
if agent_task.done():
# Drain remaining
while True:
try:
item = stream_q.get_nowait()
if item is None:
break
await _dispatch(item)
last_activity = time.monotonic()
except _q.Empty:
break
break
if time.monotonic() - last_activity >= CHAT_COMPLETIONS_SSE_KEEPALIVE_SECONDS:
await response.write(b": keepalive\n\n")
last_activity = time.monotonic()
continue
if item is None: # EOS sentinel
break
await _dispatch(item)
last_activity = time.monotonic()
# Pick up agent result + usage from the completed task
try:
result, agent_usage = await agent_task
usage = agent_usage or usage
# If the agent produced a final_response but no text
# deltas were streamed (e.g. some providers only emit
# the full response at the end), emit a single fallback
# delta so Responses clients still receive a live text part.
agent_final = result.get("final_response", "") if isinstance(result, dict) else ""
if agent_final and not final_text_parts:
await _emit_text_delta(agent_final)
if agent_final and not final_response_text:
final_response_text = agent_final
if isinstance(result, dict) and result.get("error") and not final_response_text:
agent_error = result["error"]
except Exception as e: # noqa: BLE001
logger.error("Error running agent for streaming responses: %s", e, exc_info=True)
agent_error = str(e)
# Close the message item if it was opened
final_response_text = "".join(final_text_parts) or final_response_text
if message_opened:
await _write_event("response.output_text.done", {
"type": "response.output_text.done",
"item_id": message_item_id,
"output_index": message_output_index,
"content_index": 0,
"text": final_response_text,
"logprobs": [],
})
msg_done_item = {
"id": message_item_id,
"type": "message",
"status": "completed",
"role": "assistant",
"content": [
{"type": "output_text", "text": final_response_text}
],
}
await _write_event("response.output_item.done", {
"type": "response.output_item.done",
"output_index": message_output_index,
"item": msg_done_item,
})
# Always append a final message item in the completed
# response envelope so clients that only parse the terminal
# payload still see the assistant text. This mirrors the
# shape produced by _extract_output_items in the batch path.
final_items: List[Dict[str, Any]] = list(emitted_items)
final_items.append({
"type": "message",
"role": "assistant",
"content": [
{"type": "output_text", "text": final_response_text or (agent_error or "")}
],
})
if agent_error:
failed_env = _envelope("failed")
failed_env["output"] = final_items
failed_env["error"] = {"message": agent_error, "type": "server_error"}
failed_env["usage"] = {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
}
await _write_event("response.failed", {
"type": "response.failed",
"response": failed_env,
})
else:
completed_env = _envelope("completed")
completed_env["output"] = final_items
completed_env["usage"] = {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
}
await _write_event("response.completed", {
"type": "response.completed",
"response": completed_env,
})
# Persist for future chaining / GET retrieval, mirroring
# the batch path behavior.
if store:
full_history = list(conversation_history)
full_history.append({"role": "user", "content": user_message})
if isinstance(result, dict) and result.get("messages"):
full_history.extend(result["messages"])
else:
full_history.append({"role": "assistant", "content": final_response_text})
self._response_store.put(response_id, {
"response": completed_env,
"conversation_history": full_history,
"instructions": instructions,
"session_id": session_id,
})
if conversation:
self._response_store.set_conversation(conversation, response_id)
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError):
# Client disconnected — interrupt the agent so it stops
# making upstream LLM calls, then cancel the task.
agent = agent_ref[0] if agent_ref else None
if agent is not None:
try:
agent.interrupt("SSE client disconnected")
except Exception:
pass
if not agent_task.done():
agent_task.cancel()
try:
await agent_task
except (asyncio.CancelledError, Exception):
pass
logger.info("SSE client disconnected; interrupted agent task %s", response_id)
return response
async def _handle_responses(self, request: "web.Request") -> "web.Response":
"""POST /v1/responses — OpenAI Responses API format."""
auth_err = self._check_auth(request)
@ -1035,11 +1460,13 @@ class APIServerAdapter(BasePlatformAdapter):
if previous_response_id:
logger.debug("Both conversation_history and previous_response_id provided; using conversation_history")
stored_session_id = None
if not conversation_history and previous_response_id:
stored = self._response_store.get(previous_response_id)
if stored is None:
return web.json_response(_openai_error(f"Previous response not found: {previous_response_id}"), status=404)
conversation_history = list(stored.get("conversation_history", []))
stored_session_id = stored.get("session_id")
# If no instructions provided, carry forward from previous
if instructions is None:
instructions = stored.get("instructions")
@ -1057,8 +1484,83 @@ class APIServerAdapter(BasePlatformAdapter):
if body.get("truncation") == "auto" and len(conversation_history) > 100:
conversation_history = conversation_history[-100:]
# Run the agent (with Idempotency-Key support)
session_id = str(uuid.uuid4())
# Reuse session from previous_response_id chain so the dashboard
# groups the entire conversation under one session entry.
session_id = stored_session_id or str(uuid.uuid4())
stream = bool(body.get("stream", False))
if stream:
# Streaming branch — emit OpenAI Responses SSE events as the
# agent runs so frontends can render text deltas and tool
# calls in real time. See _write_sse_responses for details.
import queue as _q
_stream_q: _q.Queue = _q.Queue()
def _on_delta(delta):
# None from the agent is a CLI box-close signal, not EOS.
# Forwarding would kill the SSE stream prematurely; the
# SSE writer detects completion via agent_task.done().
if delta is not None:
_stream_q.put(delta)
def _on_tool_progress(event_type, name, preview, args, **kwargs):
"""Queue non-start tool progress events if needed in future.
The structured Responses stream uses ``tool_start_callback``
and ``tool_complete_callback`` for exact call-id correlation,
so progress events are currently ignored here.
"""
return
def _on_tool_start(tool_call_id, function_name, function_args):
"""Queue a started tool for live function_call streaming."""
_stream_q.put(("__tool_started__", {
"tool_call_id": tool_call_id,
"name": function_name,
"arguments": function_args or {},
}))
def _on_tool_complete(tool_call_id, function_name, function_args, function_result):
"""Queue a completed tool result for live function_call_output streaming."""
_stream_q.put(("__tool_completed__", {
"tool_call_id": tool_call_id,
"name": function_name,
"arguments": function_args or {},
"result": function_result,
}))
agent_ref = [None]
agent_task = asyncio.ensure_future(self._run_agent(
user_message=user_message,
conversation_history=conversation_history,
ephemeral_system_prompt=instructions,
session_id=session_id,
stream_delta_callback=_on_delta,
tool_progress_callback=_on_tool_progress,
tool_start_callback=_on_tool_start,
tool_complete_callback=_on_tool_complete,
agent_ref=agent_ref,
))
response_id = f"resp_{uuid.uuid4().hex[:28]}"
model_name = body.get("model", self._model_name)
created_at = int(time.time())
return await self._write_sse_responses(
request=request,
response_id=response_id,
model=model_name,
created_at=created_at,
stream_q=_stream_q,
agent_task=agent_task,
agent_ref=agent_ref,
conversation_history=conversation_history,
user_message=user_message,
instructions=instructions,
conversation=conversation,
store=store,
session_id=session_id,
)
async def _compute_response():
return await self._run_agent(
@ -1133,6 +1635,7 @@ class APIServerAdapter(BasePlatformAdapter):
"response": response_data,
"conversation_history": full_history,
"instructions": instructions,
"session_id": session_id,
})
# Update conversation mapping so the next request with the same
# conversation name automatically chains to this response
@ -1486,6 +1989,8 @@ class APIServerAdapter(BasePlatformAdapter):
session_id: Optional[str] = None,
stream_delta_callback=None,
tool_progress_callback=None,
tool_start_callback=None,
tool_complete_callback=None,
agent_ref: Optional[list] = None,
) -> tuple:
"""
@ -1507,6 +2012,8 @@ class APIServerAdapter(BasePlatformAdapter):
session_id=session_id,
stream_delta_callback=stream_delta_callback,
tool_progress_callback=tool_progress_callback,
tool_start_callback=tool_start_callback,
tool_complete_callback=tool_complete_callback,
)
if agent_ref is not None:
agent_ref[0] = agent
@ -1643,10 +2150,12 @@ class APIServerAdapter(BasePlatformAdapter):
if previous_response_id:
logger.debug("Both conversation_history and previous_response_id provided; using conversation_history")
stored_session_id = None
if not conversation_history and previous_response_id:
stored = self._response_store.get(previous_response_id)
if stored:
conversation_history = list(stored.get("conversation_history", []))
stored_session_id = stored.get("session_id")
if instructions is None:
instructions = stored.get("instructions")
@ -1665,7 +2174,7 @@ class APIServerAdapter(BasePlatformAdapter):
)
conversation_history.append({"role": msg["role"], "content": str(content)})
session_id = body.get("session_id") or run_id
session_id = body.get("session_id") or stored_session_id or run_id
ephemeral_system_prompt = instructions
async def _run_and_close():

View file

@ -3905,14 +3905,11 @@ class GatewayRunner:
# intermediate reasoning) so sessions can be resumed with full context
# and transcripts are useful for debugging and training data.
#
# IMPORTANT: When the agent failed before producing any response
# (e.g. context-overflow 400), do NOT persist the user's message.
# IMPORTANT: When the agent failed (e.g. context-overflow 400,
# compression exhausted), do NOT persist the user's message.
# Persisting it would make the session even larger, causing the
# same failure on the next attempt — an infinite loop. (#1630)
agent_failed_early = (
agent_result.get("failed")
and not agent_result.get("final_response")
)
# same failure on the next attempt — an infinite loop. (#1630, #9893)
agent_failed_early = bool(agent_result.get("failed"))
if agent_failed_early:
logger.info(
"Skipping transcript persistence for failed request in "
@ -3920,6 +3917,24 @@ class GatewayRunner:
session_entry.session_id,
)
# When compression is exhausted, the session is permanently too
# large to process. Auto-reset it so the next message starts
# fresh instead of replaying the same oversized context in an
# infinite fail loop. (#9893)
if agent_result.get("compression_exhausted") and session_entry and session_key:
logger.info(
"Auto-resetting session %s after compression exhaustion.",
session_entry.session_id,
)
self.session_store.reset_session(session_key)
self._evict_cached_agent(session_key)
self._session_model_overrides.pop(session_key, None)
response = (response or "") + (
"\n\n🔄 Session auto-reset — the conversation exceeded the "
"maximum context size and could not be compressed further. "
"Your next message will start a fresh session."
)
ts = datetime.now().isoformat()
# If this is a fresh session (no history), write the full tool
@ -4027,6 +4042,8 @@ class GatewayRunner:
_hist_len = len(history) if 'history' in locals() else 0
if status_code == 401:
status_hint = " Check your API key or run `claude /login` to refresh OAuth credentials."
elif status_code == 402:
status_hint = " Your API balance or quota is exhausted. Check your provider dashboard."
elif status_code == 429:
# Check if this is a plan usage limit (resets on a schedule) vs a transient rate limit
_err_body = getattr(e, "response", None)
@ -8721,6 +8738,8 @@ class GatewayRunner:
"final_response": error_msg,
"messages": result.get("messages", []),
"api_calls": result.get("api_calls", 0),
"failed": result.get("failed", False),
"compression_exhausted": result.get("compression_exhausted", False),
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"last_prompt_tokens": _last_prompt_toks,

View file

@ -2919,6 +2919,15 @@ def gateway_command(args):
elif subcmd == "start":
system = getattr(args, 'system', False)
start_all = getattr(args, 'all', False)
if start_all:
# Kill all stale gateway processes across all profiles before starting
killed = kill_gateway_processes(all_profiles=True)
if killed:
print(f"✓ Killed {killed} stale gateway process(es) across all profiles")
_wait_for_gateway_exit(timeout=10.0, force_after=5.0)
if is_termux():
print("Gateway service start is not supported on Termux because there is no system service manager.")
print("Run manually: hermes gateway")
@ -3004,7 +3013,39 @@ def gateway_command(args):
# Try service first, fall back to killing and restarting
service_available = False
system = getattr(args, 'system', False)
restart_all = getattr(args, 'all', False)
service_configured = False
if restart_all:
# --all: stop every gateway process across all profiles, then start fresh
service_stopped = False
if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()):
try:
systemd_stop(system=system)
service_stopped = True
except subprocess.CalledProcessError:
pass
elif is_macos() and get_launchd_plist_path().exists():
try:
launchd_stop()
service_stopped = True
except subprocess.CalledProcessError:
pass
killed = kill_gateway_processes(all_profiles=True)
total = killed + (1 if service_stopped else 0)
if total:
print(f"✓ Stopped {total} gateway process(es) across all profiles")
_wait_for_gateway_exit(timeout=10.0, force_after=5.0)
# Start the current profile's service fresh
print("Starting gateway...")
if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()):
systemd_start(system=system)
elif is_macos() and get_launchd_plist_path().exists():
launchd_start()
else:
run_gateway(verbose=0)
return
if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()):
service_configured = True

View file

@ -5022,6 +5022,7 @@ For more help on a command:
# gateway start
gateway_start = gateway_subparsers.add_parser("start", help="Start the installed systemd/launchd background service")
gateway_start.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
gateway_start.add_argument("--all", action="store_true", help="Kill ALL stale gateway processes across all profiles before starting")
# gateway stop
gateway_stop = gateway_subparsers.add_parser("stop", help="Stop gateway service")
@ -5031,6 +5032,7 @@ For more help on a command:
# gateway restart
gateway_restart = gateway_subparsers.add_parser("restart", help="Restart gateway service")
gateway_restart.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
gateway_restart.add_argument("--all", action="store_true", help="Kill ALL gateway processes across all profiles before restarting")
# gateway status
gateway_status = gateway_subparsers.add_parser("status", help="Show gateway status")

View file

@ -26,7 +26,7 @@ import logging
import threading
from typing import Dict, Any, List, Optional, Tuple
from tools.registry import registry
from tools.registry import discover_builtin_tools, registry
from toolsets import resolve_toolset, validate_toolset
logger = logging.getLogger(__name__)
@ -129,45 +129,7 @@ def _run_async(coro):
# Tool Discovery (importing each module triggers its registry.register calls)
# =============================================================================
def _discover_tools():
"""Import all tool modules to trigger their registry.register() calls.
Wrapped in a function so import errors in optional tools (e.g., fal_client
not installed) don't prevent the rest from loading.
"""
_modules = [
"tools.web_tools",
"tools.terminal_tool",
"tools.file_tools",
"tools.vision_tools",
"tools.mixture_of_agents_tool",
"tools.image_generation_tool",
"tools.skills_tool",
"tools.skill_manager_tool",
"tools.browser_tool",
"tools.cronjob_tools",
"tools.rl_training_tool",
"tools.tts_tool",
"tools.todo_tool",
"tools.memory_tool",
"tools.session_search_tool",
"tools.clarify_tool",
"tools.code_execution_tool",
"tools.delegate_tool",
"tools.process_registry",
"tools.send_message_tool",
# "tools.honcho_tools", # Removed — Honcho is now a memory provider plugin
"tools.homeassistant_tool",
]
import importlib
for mod_name in _modules:
try:
importlib.import_module(mod_name)
except Exception as e:
logger.warning("Could not import tool module %s: %s", mod_name, e)
_discover_tools()
discover_builtin_tools()
# MCP tool discovery (external MCP servers from config)
try:

View file

@ -8013,6 +8013,15 @@ class AIAgent:
# skipping them because conversation_history is still the
# pre-compression length.
conversation_history = None
# Fix: reset retry counters after compression so the model
# gets a fresh budget on the compressed context. Without
# this, pre-compression retries carry over and the model
# hits "(empty)" immediately after compression-induced
# context loss.
self._empty_content_retries = 0
self._thinking_prefill_retries = 0
self._last_content_with_tools = None
self._mute_post_response = False
# Re-estimate after compression
_preflight_tokens = estimate_request_tokens_rough(
messages,
@ -9304,7 +9313,9 @@ class AIAgent:
"completed": False,
"api_calls": api_call_count,
"error": f"Request payload too large: max compression attempts ({max_compression_attempts}) reached.",
"partial": True
"partial": True,
"failed": True,
"compression_exhausted": True,
}
self._emit_status(f"⚠️ Request payload too large (413) — compression attempt {compression_attempts}/{max_compression_attempts}...")
@ -9333,7 +9344,9 @@ class AIAgent:
"completed": False,
"api_calls": api_call_count,
"error": "Request payload too large (413). Cannot compress further.",
"partial": True
"partial": True,
"failed": True,
"compression_exhausted": True,
}
# Check for context-length errors BEFORE generic 4xx handler.
@ -9384,7 +9397,9 @@ class AIAgent:
"completed": False,
"api_calls": api_call_count,
"error": f"Context length exceeded: max compression attempts ({max_compression_attempts}) reached.",
"partial": True
"partial": True,
"failed": True,
"compression_exhausted": True,
}
restart_with_compressed_messages = True
break
@ -9434,7 +9449,9 @@ class AIAgent:
"completed": False,
"api_calls": api_call_count,
"error": f"Context length exceeded: max compression attempts ({max_compression_attempts}) reached.",
"partial": True
"partial": True,
"failed": True,
"compression_exhausted": True,
}
self._emit_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...")
@ -9465,7 +9482,9 @@ class AIAgent:
"completed": False,
"api_calls": api_call_count,
"error": f"Context length exceeded ({approx_tokens:,} tokens). Cannot compress further.",
"partial": True
"partial": True,
"failed": True,
"compression_exhausted": True,
}
# Check for non-retryable client errors. The classifier
@ -10203,6 +10222,13 @@ class AIAgent:
# No tool calls - this is the final response
final_response = assistant_message.content or ""
# Fix: unmute output when entering the no-tool-call branch
# so the user can see empty-response warnings and recovery
# status messages. _mute_post_response was set during a
# prior housekeeping tool turn and should not silence the
# final response path.
self._mute_post_response = False
# Check if response only has think block with no actual content after it
if not self._has_content_after_think_block(final_response):
# ── Partial stream recovery ─────────────────────
@ -10240,16 +10266,10 @@ class AIAgent:
self._emit_status("↻ Empty response after tool calls — using earlier content as final answer")
self._last_content_with_tools = None
self._empty_content_retries = 0
for i in range(len(messages) - 1, -1, -1):
msg = messages[i]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_names = []
for tc in msg["tool_calls"]:
if not tc or not isinstance(tc, dict): continue
fn = tc.get("function", {})
tool_names.append(fn.get("name", "unknown"))
msg["content"] = f"Calling the {', '.join(tool_names)} tool{'s' if len(tool_names) > 1 else ''}..."
break
# Do NOT modify the assistant message content — the
# old code injected "Calling the X tools..." which
# poisoned the conversation history. Just use the
# fallback text as the final response and break.
final_response = self._strip_think_blocks(fallback).strip()
self._response_was_previewed = True
break

View file

@ -117,6 +117,8 @@ AUTHOR_MAP = {
"m@statecraft.systems": "mbierling",
"balyan.sid@gmail.com": "balyansid",
"oluwadareab12@gmail.com": "bennytimz",
"simon@simonmarcus.org": "simon-marcus",
"1243352777@qq.com": "zons-zhaozhy",
# ── bulk addition: 75 emails resolved via API, PR salvage bodies, noreply
# crossref, and GH contributor list matching (April 2026 audit) ──
"1115117931@qq.com": "aaronagent",

View file

@ -650,9 +650,9 @@ registry.register(
)
```
**2. Add import** in `model_tools.py``_discover_tools()` list.
**2. Add to `toolsets.py`** → `_HERMES_CORE_TOOLS` list.
**3. Add to `toolsets.py`** → `_HERMES_CORE_TOOLS` list.
Auto-discovery: any `tools/*.py` file with a top-level `registry.register()` call is imported automatically — no manual list needed.
All handlers must return JSON strings. Use `get_hermes_home()` for paths, never hardcode `~/.hermes`.

View file

@ -1016,6 +1016,47 @@ class TestResponsesEndpoint:
assert len(call_kwargs["conversation_history"]) > 0
assert call_kwargs["user_message"] == "Now add 1 more"
@pytest.mark.asyncio
async def test_previous_response_id_preserves_session(self, adapter):
"""Chained responses via previous_response_id reuse the same session_id."""
mock_result = {
"final_response": "ok",
"messages": [{"role": "assistant", "content": "ok"}],
"api_calls": 1,
}
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
# First request — establishes a session
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, usage)
resp1 = await cli.post(
"/v1/responses",
json={"model": "hermes-agent", "input": "Hello"},
)
assert resp1.status == 200
first_session_id = mock_run.call_args.kwargs["session_id"]
data1 = await resp1.json()
response_id = data1["id"]
# Second request — chains from the first
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
mock_run.return_value = (mock_result, usage)
resp2 = await cli.post(
"/v1/responses",
json={
"model": "hermes-agent",
"input": "Follow up",
"previous_response_id": response_id,
},
)
assert resp2.status == 200
second_session_id = mock_run.call_args.kwargs["session_id"]
# Session must be the same across the chain
assert first_session_id == second_session_id
@pytest.mark.asyncio
async def test_invalid_previous_response_id_returns_404(self, adapter):
app = _create_app(adapter)
@ -1115,6 +1156,134 @@ class TestResponsesEndpoint:
assert resp.status == 400
class TestResponsesStreaming:
@pytest.mark.asyncio
async def test_stream_true_returns_responses_sse(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
async def _mock_run_agent(**kwargs):
cb = kwargs.get("stream_delta_callback")
if cb:
cb("Hello")
cb(" world")
return (
{"final_response": "Hello world", "messages": [], "api_calls": 1},
{"input_tokens": 10, "output_tokens": 5, "total_tokens": 15},
)
with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent):
resp = await cli.post(
"/v1/responses",
json={"model": "hermes-agent", "input": "hi", "stream": True},
)
assert resp.status == 200
assert "text/event-stream" in resp.headers.get("Content-Type", "")
body = await resp.text()
assert "event: response.created" in body
assert "event: response.output_text.delta" in body
assert "event: response.output_text.done" in body
assert "event: response.completed" in body
assert '"sequence_number":' in body
assert '"logprobs": []' in body
assert "Hello" in body
assert " world" in body
@pytest.mark.asyncio
async def test_stream_emits_function_call_and_output_items(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
async def _mock_run_agent(**kwargs):
start_cb = kwargs.get("tool_start_callback")
complete_cb = kwargs.get("tool_complete_callback")
text_cb = kwargs.get("stream_delta_callback")
if start_cb:
start_cb("call_123", "read_file", {"path": "/tmp/test.txt"})
if complete_cb:
complete_cb("call_123", "read_file", {"path": "/tmp/test.txt"}, '{"content":"hello"}')
if text_cb:
text_cb("Done.")
return (
{
"final_response": "Done.",
"messages": [
{
"role": "assistant",
"tool_calls": [
{
"id": "call_123",
"function": {
"name": "read_file",
"arguments": '{"path":"/tmp/test.txt"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_123",
"content": '{"content":"hello"}',
},
],
"api_calls": 1,
},
{"input_tokens": 10, "output_tokens": 5, "total_tokens": 15},
)
with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent):
resp = await cli.post(
"/v1/responses",
json={"model": "hermes-agent", "input": "read the file", "stream": True},
)
assert resp.status == 200
body = await resp.text()
assert "event: response.output_item.added" in body
assert "event: response.output_item.done" in body
assert body.count("event: response.output_item.done") >= 2
assert '"type": "function_call"' in body
assert '"type": "function_call_output"' in body
assert '"call_id": "call_123"' in body
assert '"name": "read_file"' in body
assert '"output": [{"type": "input_text", "text": "{\\"content\\":\\"hello\\"}"}]' in body
@pytest.mark.asyncio
async def test_streamed_response_is_stored_for_get(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
async def _mock_run_agent(**kwargs):
cb = kwargs.get("stream_delta_callback")
if cb:
cb("Stored response")
return (
{"final_response": "Stored response", "messages": [], "api_calls": 1},
{"input_tokens": 1, "output_tokens": 2, "total_tokens": 3},
)
with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent):
resp = await cli.post(
"/v1/responses",
json={"model": "hermes-agent", "input": "store this", "stream": True},
)
body = await resp.text()
response_id = None
for line in body.splitlines():
if line.startswith("data: "):
try:
payload = json.loads(line[len("data: "):])
except json.JSONDecodeError:
continue
if payload.get("type") == "response.completed":
response_id = payload["response"]["id"]
break
assert response_id
get_resp = await cli.get(f"/v1/responses/{response_id}")
assert get_resp.status == 200
data = await get_resp.json()
assert data["id"] == response_id
assert data["status"] == "completed"
assert data["output"][-1]["content"][0]["text"] == "Stored response"
# ---------------------------------------------------------------------------
# Auth on endpoints
# ---------------------------------------------------------------------------

View file

@ -136,33 +136,29 @@ class TestGatewaySkipsPersistenceOnFailure:
the gateway should NOT persist messages to the transcript."""
def test_agent_failed_early_detected(self):
"""The agent_failed_early flag is True when failed=True and
no final_response."""
"""The agent_failed_early flag is True when failed=True,
regardless of final_response."""
agent_result = {
"failed": True,
"final_response": None,
"messages": [],
"error": "Non-retryable client error",
}
agent_failed_early = (
agent_result.get("failed")
and not agent_result.get("final_response")
)
agent_failed_early = bool(agent_result.get("failed"))
assert agent_failed_early
def test_agent_with_response_not_failed_early(self):
"""When the agent has a final_response, it's not a failed-early
scenario even if failed=True."""
def test_agent_failed_with_error_response_still_detected(self):
"""When _run_agent_blocking converts an error to final_response,
the failed flag should still trigger agent_failed_early. This
was the core bug in #9893 — the old guard checked
``not final_response`` which was always truthy after conversion."""
agent_result = {
"failed": True,
"final_response": "Here is a partial response",
"final_response": "⚠️ Request payload too large: max compression attempts reached.",
"messages": [],
}
agent_failed_early = (
agent_result.get("failed")
and not agent_result.get("final_response")
)
assert not agent_failed_early
agent_failed_early = bool(agent_result.get("failed"))
assert agent_failed_early
def test_successful_agent_not_failed_early(self):
"""A successful agent result should not trigger skip."""
@ -170,13 +166,41 @@ class TestGatewaySkipsPersistenceOnFailure:
"final_response": "Hello!",
"messages": [{"role": "assistant", "content": "Hello!"}],
}
agent_failed_early = (
agent_result.get("failed")
and not agent_result.get("final_response")
)
agent_failed_early = bool(agent_result.get("failed"))
assert not agent_failed_early
class TestCompressionExhaustedFlag:
"""When compression is exhausted, the agent should set both
failed=True and compression_exhausted=True so the gateway can
auto-reset the session. (#9893)"""
def test_compression_exhausted_returns_carry_flag(self):
"""Simulate the return dict from a compression-exhausted agent."""
agent_result = {
"messages": [],
"completed": False,
"api_calls": 3,
"error": "Request payload too large: max compression attempts (3) reached.",
"partial": True,
"failed": True,
"compression_exhausted": True,
}
assert agent_result.get("failed")
assert agent_result.get("compression_exhausted")
def test_normal_failure_not_compression_exhausted(self):
"""Non-compression failures should not have compression_exhausted."""
agent_result = {
"messages": [],
"completed": False,
"failed": True,
"error": "Invalid API response after 3 retries",
}
assert agent_result.get("failed")
assert not agent_result.get("compression_exhausted")
# ---------------------------------------------------------------------------
# Test 3: Context-overflow error messages
# ---------------------------------------------------------------------------

View file

@ -46,3 +46,59 @@ class TestFindDocker:
with patch("tools.environments.docker.shutil.which", return_value=None):
second = docker_mod.find_docker()
assert first == second == "/usr/local/bin/docker"
def test_env_var_override_takes_precedence(self, tmp_path):
"""HERMES_DOCKER_BINARY overrides PATH and known-location discovery."""
fake_binary = tmp_path / "podman"
fake_binary.write_text("#!/bin/sh\n")
fake_binary.chmod(0o755)
with patch.dict(os.environ, {"HERMES_DOCKER_BINARY": str(fake_binary)}), \
patch("tools.environments.docker.shutil.which", return_value="/usr/bin/docker"):
result = docker_mod.find_docker()
assert result == str(fake_binary)
def test_env_var_override_ignored_if_not_executable(self, tmp_path):
"""Non-executable HERMES_DOCKER_BINARY falls through to normal discovery."""
fake_binary = tmp_path / "podman"
fake_binary.write_text("#!/bin/sh\n")
fake_binary.chmod(0o644) # not executable
with patch.dict(os.environ, {"HERMES_DOCKER_BINARY": str(fake_binary)}), \
patch("tools.environments.docker.shutil.which", return_value="/usr/bin/docker"):
result = docker_mod.find_docker()
assert result == "/usr/bin/docker"
def test_env_var_override_ignored_if_nonexistent(self):
"""Non-existent HERMES_DOCKER_BINARY path falls through."""
with patch.dict(os.environ, {"HERMES_DOCKER_BINARY": "/nonexistent/podman"}), \
patch("tools.environments.docker.shutil.which", return_value="/usr/bin/docker"):
result = docker_mod.find_docker()
assert result == "/usr/bin/docker"
def test_podman_on_path_used_when_docker_missing(self):
"""When docker is not on PATH, podman is tried next."""
def which_side_effect(name):
if name == "docker":
return None
if name == "podman":
return "/usr/bin/podman"
return None
with patch("tools.environments.docker.shutil.which", side_effect=which_side_effect), \
patch("tools.environments.docker._DOCKER_SEARCH_PATHS", []):
result = docker_mod.find_docker()
assert result == "/usr/bin/podman"
def test_docker_preferred_over_podman(self):
"""When both docker and podman are on PATH, docker wins."""
def which_side_effect(name):
if name == "docker":
return "/usr/bin/docker"
if name == "podman":
return "/usr/bin/podman"
return None
with patch("tools.environments.docker.shutil.which", side_effect=which_side_effect):
result = docker_mod.find_docker()
assert result == "/usr/bin/docker"

View file

@ -2,8 +2,10 @@
import json
import threading
from pathlib import Path
from unittest.mock import patch
from tools.registry import ToolRegistry
from tools.registry import ToolRegistry, discover_builtin_tools
def _dummy_handler(args, **kwargs):
@ -286,6 +288,74 @@ class TestCheckFnExceptionHandling:
assert any(u["name"] == "crashes" for u in unavailable)
class TestBuiltinDiscovery:
def test_matches_previous_manual_builtin_tool_set(self):
expected = {
"tools.browser_tool",
"tools.clarify_tool",
"tools.code_execution_tool",
"tools.cronjob_tools",
"tools.delegate_tool",
"tools.file_tools",
"tools.homeassistant_tool",
"tools.image_generation_tool",
"tools.memory_tool",
"tools.mixture_of_agents_tool",
"tools.process_registry",
"tools.rl_training_tool",
"tools.send_message_tool",
"tools.session_search_tool",
"tools.skill_manager_tool",
"tools.skills_tool",
"tools.terminal_tool",
"tools.todo_tool",
"tools.tts_tool",
"tools.vision_tools",
"tools.web_tools",
}
with patch("tools.registry.importlib.import_module"):
imported = discover_builtin_tools(Path(__file__).resolve().parents[2] / "tools")
assert set(imported) == expected
def test_imports_only_self_registering_modules(self, tmp_path):
tools_dir = tmp_path / "tools"
tools_dir.mkdir()
(tools_dir / "__init__.py").write_text("", encoding="utf-8")
(tools_dir / "registry.py").write_text("", encoding="utf-8")
(tools_dir / "alpha.py").write_text(
"from tools.registry import registry\nregistry.register(name='alpha', toolset='x', schema={}, handler=lambda *_a, **_k: '{}')\n",
encoding="utf-8",
)
(tools_dir / "beta.py").write_text("VALUE = 1\n", encoding="utf-8")
with patch("tools.registry.importlib.import_module") as mock_import:
imported = discover_builtin_tools(tools_dir)
assert imported == ["tools.alpha"]
mock_import.assert_called_once_with("tools.alpha")
def test_skips_mcp_tool_even_if_it_registers(self, tmp_path):
tools_dir = tmp_path / "tools"
tools_dir.mkdir()
(tools_dir / "__init__.py").write_text("", encoding="utf-8")
(tools_dir / "mcp_tool.py").write_text(
"from tools.registry import registry\nregistry.register(name='mcp_alpha', toolset='mcp-test', schema={}, handler=lambda *_a, **_k: '{}')\n",
encoding="utf-8",
)
(tools_dir / "alpha.py").write_text(
"from tools.registry import registry\nregistry.register(name='alpha', toolset='x', schema={}, handler=lambda *_a, **_k: '{}')\n",
encoding="utf-8",
)
with patch("tools.registry.importlib.import_module") as mock_import:
imported = discover_builtin_tools(tools_dir)
assert imported == ["tools.alpha"]
mock_import.assert_called_once_with("tools.alpha")
class TestEmojiMetadata:
"""Verify per-tool emoji registration and lookup."""

View file

@ -99,23 +99,41 @@ def _load_hermes_env_vars() -> dict[str, str]:
def find_docker() -> Optional[str]:
"""Locate the docker CLI binary.
"""Locate the docker (or podman) CLI binary.
Checks ``shutil.which`` first (respects PATH), then probes well-known
install locations on macOS where Docker Desktop may not be in PATH
(e.g. when running as a gateway service via launchd).
Resolution order:
1. ``HERMES_DOCKER_BINARY`` env var explicit override (e.g. ``/usr/bin/podman``)
2. ``docker`` on PATH via ``shutil.which``
3. ``podman`` on PATH via ``shutil.which``
4. Well-known macOS Docker Desktop install locations
Returns the absolute path, or ``None`` if docker cannot be found.
Returns the absolute path, or ``None`` if neither runtime can be found.
"""
global _docker_executable
if _docker_executable is not None:
return _docker_executable
# 1. Explicit override via env var (e.g. for Podman on immutable distros)
override = os.getenv("HERMES_DOCKER_BINARY")
if override and os.path.isfile(override) and os.access(override, os.X_OK):
_docker_executable = override
logger.info("Using HERMES_DOCKER_BINARY override: %s", override)
return override
# 2. docker on PATH
found = shutil.which("docker")
if found:
_docker_executable = found
return found
# 3. podman on PATH (drop-in compatible for our use case)
found = shutil.which("podman")
if found:
_docker_executable = found
logger.info("Using podman as container runtime: %s", found)
return found
# 4. Well-known macOS Docker Desktop locations
for path in _DOCKER_SEARCH_PATHS:
if os.path.isfile(path) and os.access(path, os.X_OK):
_docker_executable = path

View file

@ -2036,7 +2036,7 @@ def register_mcp_servers(servers: Dict[str, dict]) -> List[str]:
def discover_mcp_tools() -> List[str]:
"""Entry point: load config, connect to MCP servers, register tools.
Called from ``model_tools._discover_tools()``. Safe to call even when
Called from ``model_tools`` after ``discover_builtin_tools()``. Safe to call even when
the ``mcp`` package is not installed (returns empty list).
Idempotent for already-connected servers. If some servers failed on a

View file

@ -14,14 +14,65 @@ Import chain (circular-import safe):
run_agent.py, cli.py, batch_runner.py, etc.
"""
import ast
import importlib
import json
import logging
import threading
from pathlib import Path
from typing import Callable, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
def _is_registry_register_call(node: ast.AST) -> bool:
"""Return True when *node* is a ``registry.register(...)`` call expression."""
if not isinstance(node, ast.Expr) or not isinstance(node.value, ast.Call):
return False
func = node.value.func
return (
isinstance(func, ast.Attribute)
and func.attr == "register"
and isinstance(func.value, ast.Name)
and func.value.id == "registry"
)
def _module_registers_tools(module_path: Path) -> bool:
"""Return True when the module contains a top-level ``registry.register(...)`` call.
Only inspects module-body statements so that helper modules which happen
to call ``registry.register()`` inside a function are not picked up.
"""
try:
source = module_path.read_text(encoding="utf-8")
tree = ast.parse(source, filename=str(module_path))
except (OSError, SyntaxError):
return False
return any(_is_registry_register_call(stmt) for stmt in tree.body)
def discover_builtin_tools(tools_dir: Optional[Path] = None) -> List[str]:
"""Import built-in self-registering tool modules and return their module names."""
tools_path = Path(tools_dir) if tools_dir is not None else Path(__file__).resolve().parent
module_names = [
f"tools.{path.stem}"
for path in sorted(tools_path.glob("*.py"))
if path.name not in {"__init__.py", "registry.py", "mcp_tool.py"}
and _module_registers_tools(path)
]
imported: List[str] = []
for mod_name in module_names:
try:
importlib.import_module(mod_name)
imported.append(mod_name)
except Exception as e:
logger.warning("Could not import tool module %s: %s", mod_name, e)
return imported
class ToolEntry:
"""Metadata for a single registered tool."""

View file

@ -14,11 +14,12 @@ Make it a **Tool** when it requires end-to-end integration with API keys, custom
## Overview
Adding a tool touches **3 files**:
Adding a tool touches **2 files**:
1. **`tools/your_tool.py`** — handler, schema, check function, `registry.register()` call
2. **`toolsets.py`** — add tool name to `_HERMES_CORE_TOOLS` (or a specific toolset)
3. **`model_tools.py`** — add `"tools.your_tool"` to the `_discover_tools()` list
Any `tools/*.py` file with a top-level `registry.register()` call is auto-discovered at startup — no manual import list required.
## Step 1: Create the Tool File
@ -124,19 +125,9 @@ _HERMES_CORE_TOOLS = [
},
```
## Step 3: Add Discovery Import
## ~~Step 3: Add Discovery Import~~ (No longer needed)
In `model_tools.py`, add the module to the `_discover_tools()` list:
```python
def _discover_tools():
_modules = [
...
"tools.weather_tool", # <-- add here
]
```
This import triggers the `registry.register()` call at the bottom of your tool file.
Tool modules with a top-level `registry.register()` call are auto-discovered by `discover_builtin_tools()` in `tools/registry.py`. No manual import list to maintain — just create your file in `tools/` and it's picked up at startup.
## Async Handlers

View file

@ -275,4 +275,4 @@ model_tools.py (imports tools/registry + triggers tool discovery)
run_agent.py, cli.py, batch_runner.py, environments/
```
This chain means tool registration happens at import time, before any agent instance is created. Adding a new tool requires an import in `model_tools.py`'s `_discover_tools()` list.
This chain means tool registration happens at import time, before any agent instance is created. Any `tools/*.py` file with a top-level `registry.register()` call is auto-discovered — no manual import list needed.

View file

@ -42,37 +42,23 @@ registry.register(
Each call creates a `ToolEntry` stored in the singleton `ToolRegistry._tools` dict keyed by tool name. If a name collision occurs across toolsets, a warning is logged and the later registration wins.
### Discovery: `_discover_tools()`
### Discovery: `discover_builtin_tools()`
When `model_tools.py` is imported, it calls `_discover_tools()` which imports every tool module in order:
When `model_tools.py` is imported, it calls `discover_builtin_tools()` from `tools/registry.py`. This function scans every `tools/*.py` file using AST parsing to find modules that contain top-level `registry.register()` calls, then imports them:
```python
_modules = [
"tools.web_tools",
"tools.terminal_tool",
"tools.file_tools",
"tools.vision_tools",
"tools.mixture_of_agents_tool",
"tools.image_generation_tool",
"tools.skills_tool",
"tools.skill_manager_tool",
"tools.browser_tool",
"tools.cronjob_tools",
"tools.rl_training_tool",
"tools.tts_tool",
"tools.todo_tool",
"tools.memory_tool",
"tools.session_search_tool",
"tools.clarify_tool",
"tools.code_execution_tool",
"tools.delegate_tool",
"tools.process_registry",
"tools.send_message_tool",
# "tools.honcho_tools", # Removed — Honcho is now a memory provider plugin
"tools.homeassistant_tool",
]
# tools/registry.py (simplified)
def discover_builtin_tools(tools_dir=None):
tools_path = Path(tools_dir) if tools_dir else Path(__file__).parent
for path in sorted(tools_path.glob("*.py")):
if path.name in {"__init__.py", "registry.py", "mcp_tool.py"}:
continue
if _module_registers_tools(path): # AST check for top-level registry.register()
importlib.import_module(f"tools.{path.stem}")
```
This auto-discovery means new tool files are picked up automatically — no manual list to maintain. The AST check only matches top-level `registry.register()` calls (not calls inside functions), so helper modules in `tools/` are not imported.
Each import triggers the module's `registry.register()` calls. Errors in optional tools (e.g., missing `fal_client` for image generation) are caught and logged — they don't prevent other tools from loading.
After core tool discovery, MCP tools and plugin tools are also discovered:

View file

@ -83,9 +83,11 @@ Standard OpenAI Chat Completions format. Stateless — the full conversation is
}
```
**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. When streaming is enabled in config, tokens are emitted live as the LLM generates them. When disabled, the full response is sent as a single SSE chunk.
**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. For **Chat Completions**, the stream uses standard `chat.completion.chunk` events plus Hermes' custom `hermes.tool.progress` event for tool-start UX. For **Responses**, the stream uses OpenAI Responses event types such as `response.created`, `response.output_text.delta`, `response.output_item.added`, `response.output_item.done`, and `response.completed`.
**Tool progress in streams**: When the agent calls tools during a streaming request, brief progress indicators are injected into the content stream as the tools start executing (e.g. `` `💻 pwd` ``, `` `🔍 Python docs` ``). These appear as inline markdown before the agent's response text, giving frontends like Open WebUI real-time visibility into tool execution.
**Tool progress in streams**:
- **Chat Completions**: Hermes emits `event: hermes.tool.progress` for tool-start visibility without polluting persisted assistant text.
- **Responses**: Hermes emits spec-native `function_call` and `function_call_output` output items during the SSE stream, so clients can render structured tool UI in real time.
### POST /v1/responses
@ -128,7 +130,7 @@ Chain responses to maintain full context (including tool calls) across turns:
}
```
The server reconstructs the full conversation from the stored response chain — all previous tool calls and results are preserved.
The server reconstructs the full conversation from the stored response chain — all previous tool calls and results are preserved. Chained requests also share the same session, so multi-turn conversations appear as a single entry in the dashboard and session history.
#### Named conversations

View file

@ -134,10 +134,10 @@ To use the Responses API mode:
3. Change **API Type** from "Chat Completions" to **"Responses (Experimental)"**
4. Save
With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`.
With the Responses API, Open WebUI sends requests in the Responses format (`input` array + `instructions`), and Hermes Agent can preserve full tool call history across turns via `previous_response_id`. When `stream: true`, Hermes also streams spec-native `function_call` and `function_call_output` items, which enables custom structured tool-call UI in clients that render Responses events.
:::note
Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The Responses API mode is mainly useful for future compatibility as frontends evolve.
Open WebUI currently manages conversation history client-side even in Responses mode — it sends the full message history in each request rather than using `previous_response_id`. The main advantage of Responses mode today is the structured event stream: text deltas, `function_call`, and `function_call_output` items arrive as OpenAI Responses SSE events instead of Chat Completions chunks.
:::
## How It Works