fix(api-server): persist response snapshot on client disconnect when store=True

This commit is contained in:
UgwujaGeorge 2026-04-24 14:47:48 +01:00 committed by Teknium
parent 7957da7a1d
commit a29bad2a3c

View file

@ -1204,10 +1204,12 @@ class APIServerAdapter(BasePlatformAdapter):
If the client disconnects mid-stream, ``agent.interrupt()`` is If the client disconnects mid-stream, ``agent.interrupt()`` is
called so the agent stops issuing upstream LLM calls, then the called so the agent stops issuing upstream LLM calls, then the
asyncio task is cancelled. When ``store=True`` the full response asyncio task is cancelled. When ``store=True`` an initial
is persisted to the ResponseStore in a ``finally`` block so GET ``in_progress`` snapshot is persisted immediately after
/v1/responses/{id} and ``previous_response_id`` chaining work the ``response.created`` and disconnects update it to an
same as the batch path. ``incomplete`` snapshot so GET /v1/responses/{id} and
``previous_response_id`` chaining still have something to
recover from.
""" """
import queue as _q import queue as _q
@ -1269,6 +1271,26 @@ class APIServerAdapter(BasePlatformAdapter):
final_response_text = "" final_response_text = ""
agent_error: Optional[str] = None agent_error: Optional[str] = None
usage: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} usage: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
terminal_snapshot_persisted = False
def _persist_response_snapshot(
response_env: Dict[str, Any],
*,
conversation_history_snapshot: Optional[List[Dict[str, Any]]] = None,
) -> None:
if not store:
return
if conversation_history_snapshot is None:
conversation_history_snapshot = list(conversation_history)
conversation_history_snapshot.append({"role": "user", "content": user_message})
self._response_store.put(response_id, {
"response": response_env,
"conversation_history": conversation_history_snapshot,
"instructions": instructions,
"session_id": session_id,
})
if conversation:
self._response_store.set_conversation(conversation, response_id)
try: try:
# response.created — initial envelope, status=in_progress # response.created — initial envelope, status=in_progress
@ -1278,6 +1300,7 @@ class APIServerAdapter(BasePlatformAdapter):
"type": "response.created", "type": "response.created",
"response": created_env, "response": created_env,
}) })
_persist_response_snapshot(created_env)
last_activity = time.monotonic() last_activity = time.monotonic()
async def _open_message_item() -> None: async def _open_message_item() -> None:
@ -1534,6 +1557,18 @@ class APIServerAdapter(BasePlatformAdapter):
"output_tokens": usage.get("output_tokens", 0), "output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0), "total_tokens": usage.get("total_tokens", 0),
} }
_failed_history = list(conversation_history)
_failed_history.append({"role": "user", "content": user_message})
if final_response_text or agent_error:
_failed_history.append({
"role": "assistant",
"content": final_response_text or agent_error,
})
_persist_response_snapshot(
failed_env,
conversation_history_snapshot=_failed_history,
)
terminal_snapshot_persisted = True
await _write_event("response.failed", { await _write_event("response.failed", {
"type": "response.failed", "type": "response.failed",
"response": failed_env, "response": failed_env,
@ -1546,30 +1581,47 @@ class APIServerAdapter(BasePlatformAdapter):
"output_tokens": usage.get("output_tokens", 0), "output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0), "total_tokens": usage.get("total_tokens", 0),
} }
full_history = list(conversation_history)
full_history.append({"role": "user", "content": user_message})
if isinstance(result, dict) and result.get("messages"):
full_history.extend(result["messages"])
else:
full_history.append({"role": "assistant", "content": final_response_text})
_persist_response_snapshot(
completed_env,
conversation_history_snapshot=full_history,
)
terminal_snapshot_persisted = True
await _write_event("response.completed", { await _write_event("response.completed", {
"type": "response.completed", "type": "response.completed",
"response": completed_env, "response": completed_env,
}) })
# Persist for future chaining / GET retrieval, mirroring
# the batch path behavior.
if store:
full_history = list(conversation_history)
full_history.append({"role": "user", "content": user_message})
if isinstance(result, dict) and result.get("messages"):
full_history.extend(result["messages"])
else:
full_history.append({"role": "assistant", "content": final_response_text})
self._response_store.put(response_id, {
"response": completed_env,
"conversation_history": full_history,
"instructions": instructions,
"session_id": session_id,
})
if conversation:
self._response_store.set_conversation(conversation, response_id)
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError): except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError):
if store and not terminal_snapshot_persisted:
incomplete_text = "".join(final_text_parts) or final_response_text
incomplete_items: List[Dict[str, Any]] = list(emitted_items)
if incomplete_text:
incomplete_items.append({
"type": "message",
"role": "assistant",
"content": [{"type": "output_text", "text": incomplete_text}],
})
incomplete_env = _envelope("incomplete")
incomplete_env["output"] = incomplete_items
incomplete_env["usage"] = {
"input_tokens": usage.get("input_tokens", 0),
"output_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
}
incomplete_history = list(conversation_history)
incomplete_history.append({"role": "user", "content": user_message})
if incomplete_text:
incomplete_history.append({"role": "assistant", "content": incomplete_text})
_persist_response_snapshot(
incomplete_env,
conversation_history_snapshot=incomplete_history,
)
# Client disconnected — interrupt the agent so it stops # Client disconnected — interrupt the agent so it stops
# making upstream LLM calls, then cancel the task. # making upstream LLM calls, then cancel the task.
agent = agent_ref[0] if agent_ref else None agent = agent_ref[0] if agent_ref else None