feat: add streaming token output and simplify CLI to plain stdout

This commit is contained in:
Brooklyn Nicholson 2026-03-09 17:33:28 -05:00
parent c754135965
commit 4d6c90c6d0
7 changed files with 599 additions and 982 deletions

View file

@ -319,7 +319,7 @@ class SendResult:
raw_response: Any = None
# Type for message handlers
# Handler may return str (sent by base) or dict(content=..., already_sent=True).
MessageHandler = Callable[[MessageEvent], Awaitable[Optional[str]]]
@ -691,11 +691,20 @@ class BasePlatformAdapter(ABC):
try:
# Call the handler (this can take a while with tool calls)
response = await self._message_handler(event)
handler_result = await self._message_handler(event)
# Normalise: handler may return str or dict(content, already_sent)
already_sent = False
if isinstance(handler_result, dict):
response = handler_result.get("content") or ""
already_sent = handler_result.get("already_sent", False)
else:
response = handler_result
# Send response if any
if not response:
logger.warning("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id)
if not already_sent:
logger.warning("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id)
if response:
# Extract MEDIA:<path> tags (from TTS tool) before other processing
media_files, response = self.extract_media(response)
@ -706,7 +715,7 @@ class BasePlatformAdapter(ABC):
logger.info("[%s] extract_images found %d image(s) in response (%d chars)", self.name, len(images), len(response))
# Send the text portion first (if any remains after extractions)
if text_content:
if text_content and not already_sent:
logger.info("[%s] Sending response (%d chars) to %s", self.name, len(text_content), event.source.chat_id)
result = await self.send(
chat_id=event.source.chat_id,

View file

@ -1291,7 +1291,9 @@ class GatewayRunner:
# Update session
self.session_store.update_session(session_entry.session_key)
if agent_result.get("already_sent"):
return {"content": response, "already_sent": True}
return response
except Exception as e:
@ -2450,6 +2452,83 @@ class GatewayRunner:
# Queue for progress messages (thread-safe)
progress_queue = queue.Queue() if tool_progress_enabled else None
last_tool = [None] # Mutable container for tracking in closure
# Streaming token queue — same pattern as progress_queue but for
# assistant text deltas. An async drain task sends/edits a single
# platform message with the accumulated text.
stream_queue = queue.Queue()
stream_sent = [False] # set True once any delta was delivered
def _stream_delta(text: str):
stream_queue.put(text)
async def send_stream_messages():
"""Drain stream_queue, deliver via send/edit_message."""
_adapter = self.adapters.get(source.platform)
if not _adapter:
return
accumulated = []
msg_id = None
can_edit = True
last_edit_ts = 0.0
EDIT_INTERVAL = 0.6 # seconds between edits (rate-limit safe)
while True:
try:
delta = stream_queue.get_nowait()
accumulated.append(delta)
stream_sent[0] = True
now = asyncio.get_event_loop().time()
if now - last_edit_ts < EDIT_INTERVAL:
# Coalesce — will flush on next poll cycle
await asyncio.sleep(0.05)
continue
full_text = "".join(accumulated)
if msg_id is None:
res = await _adapter.send(
chat_id=source.chat_id, content=full_text)
if res.success and res.message_id:
msg_id = res.message_id
elif can_edit:
res = await _adapter.edit_message(
chat_id=source.chat_id,
message_id=msg_id,
content=full_text,
)
if not res.success:
can_edit = False
last_edit_ts = now
except queue.Empty:
await asyncio.sleep(0.15)
except asyncio.CancelledError:
# Final flush
while not stream_queue.empty():
try:
accumulated.append(stream_queue.get_nowait())
except Exception:
break
if accumulated:
full_text = "".join(accumulated)
if msg_id is None:
await _adapter.send(
chat_id=source.chat_id, content=full_text)
elif can_edit:
try:
await _adapter.edit_message(
chat_id=source.chat_id,
message_id=msg_id,
content=full_text,
)
except Exception:
pass
return
except Exception as e:
logger.error("Stream message error: %s", e)
await asyncio.sleep(0.5)
def progress_callback(tool_name: str, preview: str = None, args: dict = None):
"""Callback invoked by agent when a tool is called."""
@ -2693,6 +2772,7 @@ class GatewayRunner:
session_id=session_id,
tool_progress_callback=progress_callback if tool_progress_enabled else None,
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
stream_delta_callback=_stream_delta,
platform=platform_key,
honcho_session_key=session_key,
session_db=self._session_db,
@ -2815,12 +2895,16 @@ class GatewayRunner:
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"already_sent": stream_sent[0],
}
# Start progress message sender if enabled
progress_task = None
if tool_progress_enabled:
progress_task = asyncio.create_task(send_progress_messages())
# Start stream message sender
stream_task = asyncio.create_task(send_stream_messages())
# Track this agent as running for this session (for interrupt support)
# We do this in a callback after the agent is created
@ -2896,9 +2980,10 @@ class GatewayRunner:
session_key=session_key
)
finally:
# Stop progress sender and interrupt monitor
# Stop progress sender, stream sender, and interrupt monitor
if progress_task:
progress_task.cancel()
stream_task.cancel()
interrupt_monitor.cancel()
# Clean up tracking
@ -2907,7 +2992,7 @@ class GatewayRunner:
del self._running_agents[session_key]
# Wait for cancelled tasks
for task in [progress_task, interrupt_monitor, tracking_task]:
for task in [progress_task, stream_task, interrupt_monitor, tracking_task]:
if task:
try:
await task