feat: add streaming LLM response support across all platforms

Cherry-picked from PR #828, resolved conflicts with main.
This commit is contained in:
teknium1 2026-03-11 08:56:37 -07:00
parent b2a4092783
commit 95d221c31c
6 changed files with 696 additions and 22 deletions

View file

@ -173,6 +173,7 @@ class APIServerAdapter(BasePlatformAdapter):
self,
ephemeral_system_prompt: Optional[str] = None,
session_id: Optional[str] = None,
stream_callback=None,
) -> Any:
"""
Create an AIAgent instance using the gateway's runtime config.
@ -213,6 +214,7 @@ class APIServerAdapter(BasePlatformAdapter):
ephemeral_system_prompt=ephemeral_system_prompt or None,
session_id=session_id,
platform="api_server",
stream_callback=stream_callback,
)
return agent
@ -298,8 +300,31 @@ class APIServerAdapter(BasePlatformAdapter):
status=400,
)
# Run the agent in an executor (run_conversation is synchronous)
session_id = str(uuid.uuid4())
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
model_name = body.get("model", "hermes-agent")
created = int(time.time())
if stream:
import queue as _q
_stream_q = _q.Queue()
def _on_api_token(delta):
_stream_q.put(delta) # None = done
# Start agent in background
agent_task = asyncio.ensure_future(self._run_agent(
user_message=user_message,
conversation_history=history,
ephemeral_system_prompt=system_prompt,
session_id=session_id,
stream_callback=_on_api_token,
))
return await self._write_real_sse_chat_completion(
request, completion_id, model_name, created, _stream_q, agent_task
)
# Non-streaming: run the agent and return full response
try:
result, usage = await self._run_agent(
user_message=user_message,
@ -318,18 +343,6 @@ class APIServerAdapter(BasePlatformAdapter):
if not final_response:
final_response = result.get("error", "(No response generated)")
completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
model_name = body.get("model", "hermes-agent")
created = int(time.time())
if stream:
# Pseudo-streaming: return the full response as SSE chunks.
# Not true token-by-token streaming, but compatible with clients
# (like Open WebUI) that expect SSE format.
return await self._write_sse_chat_completion(
request, completion_id, model_name, created, final_response, usage
)
response_data = {
"id": completion_id,
"object": "chat.completion",
@ -354,6 +367,71 @@ class APIServerAdapter(BasePlatformAdapter):
return web.json_response(response_data)
async def _write_real_sse_chat_completion(
self, request: "web.Request", completion_id: str, model: str,
created: int, stream_q, agent_task,
) -> "web.StreamResponse":
"""Write real streaming SSE from agent's stream_callback queue."""
import queue as _q
response = web.StreamResponse(
status=200,
headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
)
await response.prepare(request)
# Role chunk
role_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"role": "assistant"}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(role_chunk)}\n\n".encode())
# Stream content chunks as they arrive from the agent
loop = asyncio.get_event_loop()
while True:
try:
delta = await loop.run_in_executor(None, lambda: stream_q.get(timeout=0.5))
except _q.Empty:
if agent_task.done():
break
continue
if delta is None: # End of stream
break
content_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {"content": delta}, "finish_reason": None}],
}
await response.write(f"data: {json.dumps(content_chunk)}\n\n".encode())
# Get usage from completed agent
usage = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
try:
result, agent_usage = await agent_task
usage = agent_usage or usage
except Exception:
pass
# Finish chunk
finish_chunk = {
"id": completion_id, "object": "chat.completion.chunk",
"created": created, "model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
"usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
await response.write(f"data: {json.dumps(finish_chunk)}\n\n".encode())
await response.write(b"data: [DONE]\n\n")
return response
async def _write_sse_chat_completion(
self, request: "web.Request", completion_id: str, model: str,
created: int, content: str, usage: Dict[str, int],
@ -671,6 +749,7 @@ class APIServerAdapter(BasePlatformAdapter):
conversation_history: List[Dict[str, str]],
ephemeral_system_prompt: Optional[str] = None,
session_id: Optional[str] = None,
stream_callback=None,
) -> tuple:
"""
Create an agent and run a conversation in a thread executor.
@ -684,6 +763,7 @@ class APIServerAdapter(BasePlatformAdapter):
agent = self._create_agent(
ephemeral_system_prompt=ephemeral_system_prompt,
session_id=session_id,
stream_callback=stream_callback,
)
result = agent.run_conversation(
user_message=user_message,