feat(api): structured run events via /v1/runs SSE endpoint

Add POST /v1/runs to start async agent runs and GET /v1/runs/{run_id}/events
for SSE streaming of typed lifecycle events (tool.started, tool.completed,
message.delta, reasoning.available, run.completed, run.failed).

Changes the internal tool_progress_callback signature from positional
(tool_name, preview, args) to event-type-first
(event_type, tool_name, preview, args, **kwargs). Existing consumers
filter on event_type and remain backward-compatible.

Adds concurrency limit (_MAX_CONCURRENT_RUNS=10) and orphaned run sweep.

Fixes logic inversion in cli.py _on_tool_progress where the original PR
would have displayed internal tools instead of non-internal ones.

Co-authored-by: Mibayy <mibayy@users.noreply.github.com>
This commit is contained in:
Mibayy 2026-04-05 11:52:46 -07:00 committed by Teknium
parent e167ad8f61
commit cc2b56b26a
11 changed files with 337 additions and 44 deletions

View file

@ -54,14 +54,18 @@ def make_tool_progress_cb(
Signature expected by AIAgent:: Signature expected by AIAgent::
tool_progress_callback(name: str, preview: str, args: dict) tool_progress_callback(event_type: str, name: str, preview: str, args: dict, **kwargs)
Emits ``ToolCallStart`` for each tool invocation and tracks IDs in a FIFO Emits ``ToolCallStart`` for ``tool.started`` events and tracks IDs in a FIFO
queue per tool name so duplicate/parallel same-name calls still complete queue per tool name so duplicate/parallel same-name calls still complete
against the correct ACP tool call. against the correct ACP tool call. Other event types (``tool.completed``,
``reasoning.available``) are silently ignored.
""" """
def _tool_progress(name: str, preview: str, args: Any = None) -> None: def _tool_progress(event_type: str, name: str = None, preview: str = None, args: Any = None, **kwargs) -> None:
# Only emit ACP ToolCallStart for tool.started; ignore other event types
if event_type != "tool.started":
return
if isinstance(args, str): if isinstance(args, str):
try: try:
args = json.loads(args) args = json.loads(args)

View file

@ -12,7 +12,6 @@ import acp
from acp.schema import ( from acp.schema import (
AgentCapabilities, AgentCapabilities,
AuthenticateResponse, AuthenticateResponse,
AuthMethodAgent,
AvailableCommand, AvailableCommand,
AvailableCommandsUpdate, AvailableCommandsUpdate,
ClientCapabilities, ClientCapabilities,
@ -43,6 +42,12 @@ from acp.schema import (
Usage, Usage,
) )
# AuthMethodAgent was renamed from AuthMethod in agent-client-protocol 0.9.0
try:
from acp.schema import AuthMethodAgent
except ImportError:
from acp.schema import AuthMethod as AuthMethodAgent # type: ignore[attr-defined]
from acp_adapter.auth import detect_provider, has_provider from acp_adapter.auth import detect_provider, has_provider
from acp_adapter.events import ( from acp_adapter.events import (
make_message_cb, make_message_cb,

11
cli.py
View file

@ -5457,14 +5457,17 @@ class HermesCLI:
# Tool progress callback (audio cues for voice mode) # Tool progress callback (audio cues for voice mode)
# ==================================================================== # ====================================================================
def _on_tool_progress(self, function_name: str, preview: str, function_args: dict): def _on_tool_progress(self, event_type: str, function_name: str = None, preview: str = None, function_args: dict = None, **kwargs):
"""Called when a tool starts executing. """Called on tool lifecycle events (tool.started, tool.completed, reasoning.available, etc.).
Updates the TUI spinner widget so the user can see what the agent Updates the TUI spinner widget so the user can see what the agent
is doing during tool execution (fills the gap between thinking is doing during tool execution (fills the gap between thinking
spinner and next response). Also plays audio cue in voice mode. spinner and next response). Also plays audio cue in voice mode.
""" """
if not function_name.startswith("_"): # Only act on tool.started; ignore tool.completed, reasoning.available, etc.
if event_type != "tool.started":
return
if function_name and not function_name.startswith("_"):
from agent.display import get_tool_emoji from agent.display import get_tool_emoji
emoji = get_tool_emoji(function_name) emoji = get_tool_emoji(function_name)
label = preview or function_name label = preview or function_name
@ -5477,7 +5480,7 @@ class HermesCLI:
if not self._voice_mode: if not self._voice_mode:
return return
if function_name.startswith("_"): if not function_name or function_name.startswith("_"):
return return
try: try:
from tools.voice_mode import play_beep from tools.voice_mode import play_beep

View file

@ -7,6 +7,8 @@ Exposes an HTTP server with endpoints:
- GET /v1/responses/{response_id} Retrieve a stored response - GET /v1/responses/{response_id} Retrieve a stored response
- DELETE /v1/responses/{response_id} Delete a stored response - DELETE /v1/responses/{response_id} Delete a stored response
- GET /v1/models lists hermes-agent as an available model - GET /v1/models lists hermes-agent as an available model
- POST /v1/runs start a run, returns run_id immediately (202)
- GET /v1/runs/{run_id}/events SSE stream of structured lifecycle events
- GET /health health check - GET /health health check
Any OpenAI-compatible frontend (Open WebUI, LobeChat, LibreChat, Any OpenAI-compatible frontend (Open WebUI, LobeChat, LibreChat,
@ -300,6 +302,10 @@ class APIServerAdapter(BasePlatformAdapter):
self._runner: Optional["web.AppRunner"] = None self._runner: Optional["web.AppRunner"] = None
self._site: Optional["web.TCPSite"] = None self._site: Optional["web.TCPSite"] = None
self._response_store = ResponseStore() self._response_store = ResponseStore()
# Active run streams: run_id -> asyncio.Queue of SSE event dicts
self._run_streams: Dict[str, "asyncio.Queue[Optional[Dict]]"] = {}
# Creation timestamps for orphaned-run TTL sweep
self._run_streams_created: Dict[str, float] = {}
self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity
@staticmethod @staticmethod
@ -1287,6 +1293,236 @@ class APIServerAdapter(BasePlatformAdapter):
return await loop.run_in_executor(None, _run) return await loop.run_in_executor(None, _run)
# ------------------------------------------------------------------
# /v1/runs — structured event streaming
# ------------------------------------------------------------------
_MAX_CONCURRENT_RUNS = 10 # Prevent unbounded resource allocation
_RUN_STREAM_TTL = 300 # seconds before orphaned runs are swept
def _make_run_event_callback(self, run_id: str, loop: "asyncio.AbstractEventLoop"):
"""Return a tool_progress_callback that pushes structured events to the run's SSE queue."""
def _push(event: Dict[str, Any]) -> None:
q = self._run_streams.get(run_id)
if q is None:
return
try:
loop.call_soon_threadsafe(q.put_nowait, event)
except Exception:
pass
def _callback(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs):
ts = time.time()
if event_type == "tool.started":
_push({
"event": "tool.started",
"run_id": run_id,
"timestamp": ts,
"tool": tool_name,
"preview": preview,
})
elif event_type == "tool.completed":
_push({
"event": "tool.completed",
"run_id": run_id,
"timestamp": ts,
"tool": tool_name,
"duration": round(kwargs.get("duration", 0), 3),
"error": kwargs.get("is_error", False),
})
elif event_type == "reasoning.available":
_push({
"event": "reasoning.available",
"run_id": run_id,
"timestamp": ts,
"text": preview or "",
})
# _thinking and subagent_progress are intentionally not forwarded
return _callback
async def _handle_runs(self, request: "web.Request") -> "web.Response":
"""POST /v1/runs — start an agent run, return run_id immediately."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
# Enforce concurrency limit
if len(self._run_streams) >= self._MAX_CONCURRENT_RUNS:
return web.json_response(
_openai_error(f"Too many concurrent runs (max {self._MAX_CONCURRENT_RUNS})", code="rate_limit_exceeded"),
status=429,
)
try:
body = await request.json()
except Exception:
return web.json_response(_openai_error("Invalid JSON"), status=400)
raw_input = body.get("input")
if not raw_input:
return web.json_response(_openai_error("Missing 'input' field"), status=400)
user_message = raw_input if isinstance(raw_input, str) else (raw_input[-1].get("content", "") if isinstance(raw_input, list) else "")
if not user_message:
return web.json_response(_openai_error("No user message found in input"), status=400)
run_id = f"run_{uuid.uuid4().hex}"
loop = asyncio.get_running_loop()
q: "asyncio.Queue[Optional[Dict]]" = asyncio.Queue()
self._run_streams[run_id] = q
self._run_streams_created[run_id] = time.time()
event_cb = self._make_run_event_callback(run_id, loop)
# Also wire stream_delta_callback so message.delta events flow through
def _text_cb(delta: Optional[str]) -> None:
if delta is None:
return
try:
loop.call_soon_threadsafe(q.put_nowait, {
"event": "message.delta",
"run_id": run_id,
"timestamp": time.time(),
"delta": delta,
})
except Exception:
pass
instructions = body.get("instructions")
previous_response_id = body.get("previous_response_id")
conversation_history: List[Dict[str, str]] = []
if previous_response_id:
stored = self._response_store.get(previous_response_id)
if stored:
conversation_history = list(stored.get("conversation_history", []))
if instructions is None:
instructions = stored.get("instructions")
session_id = body.get("session_id") or run_id
ephemeral_system_prompt = instructions
async def _run_and_close():
try:
agent = self._create_agent(
ephemeral_system_prompt=ephemeral_system_prompt,
session_id=session_id,
stream_delta_callback=_text_cb,
tool_progress_callback=event_cb,
)
def _run_sync():
r = agent.run_conversation(
user_message=user_message,
conversation_history=conversation_history,
)
u = {
"input_tokens": getattr(agent, "session_prompt_tokens", 0) or 0,
"output_tokens": getattr(agent, "session_completion_tokens", 0) or 0,
"total_tokens": getattr(agent, "session_total_tokens", 0) or 0,
}
return r, u
result, usage = await asyncio.get_running_loop().run_in_executor(None, _run_sync)
final_response = result.get("final_response", "") if isinstance(result, dict) else ""
q.put_nowait({
"event": "run.completed",
"run_id": run_id,
"timestamp": time.time(),
"output": final_response,
"usage": usage,
})
except Exception as exc:
logger.exception("[api_server] run %s failed", run_id)
try:
q.put_nowait({
"event": "run.failed",
"run_id": run_id,
"timestamp": time.time(),
"error": str(exc),
})
except Exception:
pass
finally:
# Sentinel: signal SSE stream to close
try:
q.put_nowait(None)
except Exception:
pass
task = asyncio.create_task(_run_and_close())
try:
self._background_tasks.add(task)
except TypeError:
pass
if hasattr(task, "add_done_callback"):
task.add_done_callback(self._background_tasks.discard)
return web.json_response({"run_id": run_id, "status": "started"}, status=202)
async def _handle_run_events(self, request: "web.Request") -> "web.StreamResponse":
"""GET /v1/runs/{run_id}/events — SSE stream of structured agent lifecycle events."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
run_id = request.match_info["run_id"]
# Allow subscribing slightly before the run is registered (race condition window)
for _ in range(20):
if run_id in self._run_streams:
break
await asyncio.sleep(0.05)
else:
return web.json_response(_openai_error(f"Run not found: {run_id}", code="run_not_found"), status=404)
q = self._run_streams[run_id]
response = web.StreamResponse(
status=200,
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
await response.prepare(request)
try:
while True:
try:
event = await asyncio.wait_for(q.get(), timeout=30.0)
except asyncio.TimeoutError:
await response.write(b": keepalive\n\n")
continue
if event is None:
# Run finished — send final SSE comment and close
await response.write(b": stream closed\n\n")
break
payload = f"data: {json.dumps(event)}\n\n"
await response.write(payload.encode())
except Exception as exc:
logger.debug("[api_server] SSE stream error for run %s: %s", run_id, exc)
finally:
self._run_streams.pop(run_id, None)
self._run_streams_created.pop(run_id, None)
return response
async def _sweep_orphaned_runs(self) -> None:
"""Periodically clean up run streams that were never consumed."""
while True:
await asyncio.sleep(60)
now = time.time()
stale = [
run_id
for run_id, created_at in list(self._run_streams_created.items())
if now - created_at > self._RUN_STREAM_TTL
]
for run_id in stale:
logger.debug("[api_server] sweeping orphaned run %s", run_id)
self._run_streams.pop(run_id, None)
self._run_streams_created.pop(run_id, None)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# BasePlatformAdapter interface # BasePlatformAdapter interface
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@ -1317,6 +1553,17 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job) self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job) self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job) self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
# Structured event streaming
self._app.router.add_post("/v1/runs", self._handle_runs)
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)
# Start background sweep to clean up orphaned (unconsumed) run streams
sweep_task = asyncio.create_task(self._sweep_orphaned_runs())
try:
self._background_tasks.add(sweep_task)
except TypeError:
pass
if hasattr(sweep_task, "add_done_callback"):
sweep_task.add_done_callback(self._background_tasks.discard)
# Port conflict detection — fail fast if port is already in use # Port conflict detection — fail fast if port is already in use
import socket as _socket import socket as _socket

View file

@ -6000,11 +6000,15 @@ class GatewayRunner:
last_progress_msg = [None] # Track last message for dedup last_progress_msg = [None] # Track last message for dedup
repeat_count = [0] # How many times the same message repeated repeat_count = [0] # How many times the same message repeated
def progress_callback(tool_name: str, preview: str = None, args: dict = None): def progress_callback(event_type: str, tool_name: str = None, preview: str = None, args: dict = None, **kwargs):
"""Callback invoked by agent when a tool is called.""" """Callback invoked by agent on tool lifecycle events."""
if not progress_queue: if not progress_queue:
return return
# Only act on tool.started events (ignore tool.completed, reasoning.available, etc.)
if event_type not in ("tool.started",):
return
# "new" mode: only report when tool changes # "new" mode: only report when tool changes
if progress_mode == "new" and tool_name == last_tool[0]: if progress_mode == "new" and tool_name == last_tool[0]:
return return

View file

@ -6056,7 +6056,7 @@ class AIAgent:
if self.tool_progress_callback: if self.tool_progress_callback:
try: try:
preview = _build_tool_preview(name, args) preview = _build_tool_preview(name, args)
self.tool_progress_callback(name, preview, args) self.tool_progress_callback("tool.started", name, preview, args)
except Exception as cb_err: except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}") logging.debug(f"Tool progress callback error: {cb_err}")
@ -6121,6 +6121,15 @@ class AIAgent:
result_preview = function_result[:200] if len(function_result) > 200 else function_result result_preview = function_result[:200] if len(function_result) > 200 else function_result
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview) logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
if self.tool_progress_callback:
try:
self.tool_progress_callback(
"tool.completed", function_name, None, None,
duration=tool_duration, is_error=is_error,
)
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
if self.verbose_logging: if self.verbose_logging:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s") logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result ({len(function_result)} chars): {function_result}") logging.debug(f"Tool result ({len(function_result)} chars): {function_result}")
@ -6220,7 +6229,7 @@ class AIAgent:
if self.tool_progress_callback: if self.tool_progress_callback:
try: try:
preview = _build_tool_preview(function_name, function_args) preview = _build_tool_preview(function_name, function_args)
self.tool_progress_callback(function_name, preview, function_args) self.tool_progress_callback("tool.started", function_name, preview, function_args)
except Exception as cb_err: except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}") logging.debug(f"Tool progress callback error: {cb_err}")
@ -6407,6 +6416,15 @@ class AIAgent:
if _is_error_result: if _is_error_result:
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview) logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
if self.tool_progress_callback:
try:
self.tool_progress_callback(
"tool.completed", function_name, None, None,
duration=tool_duration, is_error=_is_error_result,
)
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
if self.verbose_logging: if self.verbose_logging:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s") logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result ({len(function_result)} chars): {function_result}") logging.debug(f"Tool result ({len(function_result)} chars): {function_result}")
@ -8263,21 +8281,25 @@ class AIAgent:
# Notify progress callback of model's thinking (used by subagent # Notify progress callback of model's thinking (used by subagent
# delegation to relay the child's reasoning to the parent display). # delegation to relay the child's reasoning to the parent display).
# Guard: only fire for subagents (_delegate_depth >= 1) to avoid if (assistant_message.content and self.tool_progress_callback):
# spamming gateway platforms with the main agent's every thought.
if (assistant_message.content and self.tool_progress_callback
and getattr(self, '_delegate_depth', 0) > 0):
_think_text = assistant_message.content.strip() _think_text = assistant_message.content.strip()
# Strip reasoning XML tags that shouldn't leak to parent display # Strip reasoning XML tags that shouldn't leak to parent display
_think_text = re.sub( _think_text = re.sub(
r'</?(?:REASONING_SCRATCHPAD|think|reasoning)>', '', _think_text r'</?(?:REASONING_SCRATCHPAD|think|reasoning)>', '', _think_text
).strip() ).strip()
# For subagents: relay first line to parent display (existing behaviour).
# For all agents with a structured callback: emit reasoning.available event.
first_line = _think_text.split('\n')[0][:80] if _think_text else "" first_line = _think_text.split('\n')[0][:80] if _think_text else ""
if first_line: if first_line and getattr(self, '_delegate_depth', 0) > 0:
try: try:
self.tool_progress_callback("_thinking", first_line) self.tool_progress_callback("_thinking", first_line)
except Exception: except Exception:
pass pass
elif _think_text:
try:
self.tool_progress_callback("reasoning.available", "_thinking", _think_text[:500], None)
except Exception:
pass
# Check for incomplete <REASONING_SCRATCHPAD> (opened but never closed) # Check for incomplete <REASONING_SCRATCHPAD> (opened but never closed)
# This means the model ran out of output tokens mid-reasoning — retry up to 2 times # This means the model ran out of output tokens mid-reasoning — retry up to 2 times

View file

@ -52,7 +52,7 @@ class TestToolProgressCallback:
future.result.return_value = None future.result.return_value = None
mock_rcts.return_value = future mock_rcts.return_value = future
cb("terminal", "$ ls -la", {"command": "ls -la"}) cb("tool.started", "terminal", "$ ls -la", {"command": "ls -la"})
# Should have tracked the tool call ID # Should have tracked the tool call ID
assert "terminal" in tool_call_ids assert "terminal" in tool_call_ids
@ -75,7 +75,7 @@ class TestToolProgressCallback:
future.result.return_value = None future.result.return_value = None
mock_rcts.return_value = future mock_rcts.return_value = future
cb("read_file", "Reading /etc/hosts", '{"path": "/etc/hosts"}') cb("tool.started", "read_file", "Reading /etc/hosts", '{"path": "/etc/hosts"}')
assert "read_file" in tool_call_ids assert "read_file" in tool_call_ids
@ -91,7 +91,7 @@ class TestToolProgressCallback:
future.result.return_value = None future.result.return_value = None
mock_rcts.return_value = future mock_rcts.return_value = future
cb("terminal", "$ echo hi", None) cb("tool.started", "terminal", "$ echo hi", None)
assert "terminal" in tool_call_ids assert "terminal" in tool_call_ids
@ -108,8 +108,8 @@ class TestToolProgressCallback:
future.result.return_value = None future.result.return_value = None
mock_rcts.return_value = future mock_rcts.return_value = future
progress_cb("terminal", "$ ls", {"command": "ls"}) progress_cb("tool.started", "terminal", "$ ls", {"command": "ls"})
progress_cb("terminal", "$ pwd", {"command": "pwd"}) progress_cb("tool.started", "terminal", "$ pwd", {"command": "pwd"})
assert len(tool_call_ids["terminal"]) == 2 assert len(tool_call_ids["terminal"]) == 2
step_cb(1, [{"name": "terminal", "result": "ok-1"}]) step_cb(1, [{"name": "terminal", "result": "ok-1"}])

View file

@ -130,7 +130,7 @@ class TestMcpRegistrationE2E:
# 1) Agent fires tool_progress_callback (ToolCallStart) # 1) Agent fires tool_progress_callback (ToolCallStart)
if agent.tool_progress_callback: if agent.tool_progress_callback:
agent.tool_progress_callback( agent.tool_progress_callback(
"terminal", "$ echo hello", {"command": "echo hello"} "tool.started", "terminal", "$ echo hello", {"command": "echo hello"}
) )
# 2) Agent fires step_callback with tool results (ToolCallUpdate) # 2) Agent fires step_callback with tool results (ToolCallUpdate)
@ -197,8 +197,8 @@ class TestMcpRegistrationE2E:
agent = state.agent agent = state.agent
# Fire two tool calls # Fire two tool calls
if agent.tool_progress_callback: if agent.tool_progress_callback:
agent.tool_progress_callback("read_file", "read: /etc/hosts", {"path": "/etc/hosts"}) agent.tool_progress_callback("tool.started", "read_file", "read: /etc/hosts", {"path": "/etc/hosts"})
agent.tool_progress_callback("web_search", "web search: test", {"query": "test"}) agent.tool_progress_callback("tool.started", "web_search", "web search: test", {"query": "test"})
if agent.step_callback: if agent.step_callback:
agent.step_callback(1, [ agent.step_callback(1, [

View file

@ -96,7 +96,7 @@ class TestBuildChildProgressCallback:
cb = _build_child_progress_callback(0, parent) cb = _build_child_progress_callback(0, parent)
assert cb is not None assert cb is not None
cb("web_search", "quantum computing") cb("tool.started", "web_search", "quantum computing", {})
output = buf.getvalue() output = buf.getvalue()
assert "web_search" in output assert "web_search" in output
assert "quantum computing" in output assert "quantum computing" in output
@ -131,11 +131,11 @@ class TestBuildChildProgressCallback:
# Send 4 tool calls — shouldn't flush yet (BATCH_SIZE = 5) # Send 4 tool calls — shouldn't flush yet (BATCH_SIZE = 5)
for i in range(4): for i in range(4):
cb(f"tool_{i}", f"arg_{i}") cb("tool.started", f"tool_{i}", f"arg_{i}", {})
parent_cb.assert_not_called() parent_cb.assert_not_called()
# 5th call should trigger flush # 5th call should trigger flush
cb("tool_4", "arg_4") cb("tool.started", "tool_4", "arg_4", {})
parent_cb.assert_called_once() parent_cb.assert_called_once()
call_args = parent_cb.call_args call_args = parent_cb.call_args
assert "tool_0" in call_args[0][1] assert "tool_0" in call_args[0][1]
@ -207,7 +207,7 @@ class TestBuildChildProgressCallback:
parent.tool_progress_callback = None parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent, task_count=1) cb = _build_child_progress_callback(0, parent, task_count=1)
cb("web_search", "test") cb("tool.started", "web_search", "test", {})
output = buf.getvalue() output = buf.getvalue()
assert "[" not in output assert "[" not in output
@ -330,9 +330,9 @@ class TestBatchFlush:
cb = _build_child_progress_callback(0, parent) cb = _build_child_progress_callback(0, parent)
# Send 3 tools (below batch size of 5) # Send 3 tools (below batch size of 5)
cb("web_search", "query1") cb("tool.started", "web_search", "query1", {})
cb("read_file", "file.txt") cb("tool.started", "read_file", "file.txt", {})
cb("write_file", "out.txt") cb("tool.started", "write_file", "out.txt", {})
parent_cb.assert_not_called() parent_cb.assert_not_called()
# Flush should send the remaining 3 # Flush should send the remaining 3
@ -365,7 +365,7 @@ class TestBatchFlush:
parent.tool_progress_callback = None parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, parent) cb = _build_child_progress_callback(0, parent)
cb("web_search", "test") cb("tool.started", "web_search", "test", {})
cb._flush() # Should not crash cb._flush() # Should not crash

View file

@ -60,9 +60,9 @@ class FakeAgent:
self.tools = [] self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None): def run_conversation(self, message, conversation_history=None, task_id=None):
self.tool_progress_callback("terminal", "pwd") self.tool_progress_callback("tool.started", "terminal", "pwd", {})
time.sleep(0.35) time.sleep(0.35)
self.tool_progress_callback("browser_navigate", "https://example.com") self.tool_progress_callback("tool.started", "browser_navigate", "https://example.com", {})
time.sleep(0.35) time.sleep(0.35)
return { return {
"final_response": "done", "final_response": "done",

View file

@ -98,11 +98,15 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
_BATCH_SIZE = 5 _BATCH_SIZE = 5
_batch: List[str] = [] _batch: List[str] = []
def _callback(tool_name: str, preview: str = None): def _callback(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs):
# Special "_thinking" event: model produced text content (reasoning) # event_type is one of: "tool.started", "tool.completed",
if tool_name == "_thinking": # "reasoning.available", "_thinking", "subagent_progress"
# "_thinking" / reasoning events
if event_type in ("_thinking", "reasoning.available"):
text = preview or tool_name or ""
if spinner: if spinner:
short = (preview[:55] + "...") if preview and len(preview) > 55 else (preview or "") short = (text[:55] + "...") if len(text) > 55 else text
try: try:
spinner.print_above(f" {prefix}├─ 💭 \"{short}\"") spinner.print_above(f" {prefix}├─ 💭 \"{short}\"")
except Exception as e: except Exception as e:
@ -110,11 +114,15 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
# Don't relay thinking to gateway (too noisy for chat) # Don't relay thinking to gateway (too noisy for chat)
return return
# Regular tool call event # tool.completed — no display needed here (spinner shows on started)
if event_type == "tool.completed":
return
# tool.started — display and batch for parent relay
if spinner: if spinner:
short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "") short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "")
from agent.display import get_tool_emoji from agent.display import get_tool_emoji
emoji = get_tool_emoji(tool_name) emoji = get_tool_emoji(tool_name or "")
line = f" {prefix}├─ {emoji} {tool_name}" line = f" {prefix}├─ {emoji} {tool_name}"
if short: if short:
line += f" \"{short}\"" line += f" \"{short}\""
@ -124,7 +132,7 @@ def _build_child_progress_callback(task_index: int, parent_agent, task_count: in
logger.debug("Spinner print_above failed: %s", e) logger.debug("Spinner print_above failed: %s", e)
if parent_cb: if parent_cb:
_batch.append(tool_name) _batch.append(tool_name or "")
if len(_batch) >= _BATCH_SIZE: if len(_batch) >= _BATCH_SIZE:
summary = ", ".join(_batch) summary = ", ".join(_batch)
try: try: