diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index a6b52ff32..37b7121a5 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1204,10 +1204,12 @@ class APIServerAdapter(BasePlatformAdapter): If the client disconnects mid-stream, ``agent.interrupt()`` is called so the agent stops issuing upstream LLM calls, then the - asyncio task is cancelled. When ``store=True`` the full response - is persisted to the ResponseStore in a ``finally`` block so GET - /v1/responses/{id} and ``previous_response_id`` chaining work the - same as the batch path. + asyncio task is cancelled. When ``store=True`` an initial + ``in_progress`` snapshot is persisted immediately after + ``response.created`` and disconnects update it to an + ``incomplete`` snapshot so GET /v1/responses/{id} and + ``previous_response_id`` chaining still have something to + recover from. """ import queue as _q @@ -1269,6 +1271,26 @@ class APIServerAdapter(BasePlatformAdapter): final_response_text = "" agent_error: Optional[str] = None usage: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + terminal_snapshot_persisted = False + + def _persist_response_snapshot( + response_env: Dict[str, Any], + *, + conversation_history_snapshot: Optional[List[Dict[str, Any]]] = None, + ) -> None: + if not store: + return + if conversation_history_snapshot is None: + conversation_history_snapshot = list(conversation_history) + conversation_history_snapshot.append({"role": "user", "content": user_message}) + self._response_store.put(response_id, { + "response": response_env, + "conversation_history": conversation_history_snapshot, + "instructions": instructions, + "session_id": session_id, + }) + if conversation: + self._response_store.set_conversation(conversation, response_id) try: # response.created — initial envelope, status=in_progress @@ -1278,6 +1300,7 @@ class APIServerAdapter(BasePlatformAdapter): "type": "response.created", "response": created_env, }) + _persist_response_snapshot(created_env) last_activity = time.monotonic() async def _open_message_item() -> None: @@ -1534,6 +1557,18 @@ class APIServerAdapter(BasePlatformAdapter): "output_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("total_tokens", 0), } + _failed_history = list(conversation_history) + _failed_history.append({"role": "user", "content": user_message}) + if final_response_text or agent_error: + _failed_history.append({ + "role": "assistant", + "content": final_response_text or agent_error, + }) + _persist_response_snapshot( + failed_env, + conversation_history_snapshot=_failed_history, + ) + terminal_snapshot_persisted = True await _write_event("response.failed", { "type": "response.failed", "response": failed_env, @@ -1546,30 +1581,47 @@ class APIServerAdapter(BasePlatformAdapter): "output_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("total_tokens", 0), } + full_history = list(conversation_history) + full_history.append({"role": "user", "content": user_message}) + if isinstance(result, dict) and result.get("messages"): + full_history.extend(result["messages"]) + else: + full_history.append({"role": "assistant", "content": final_response_text}) + _persist_response_snapshot( + completed_env, + conversation_history_snapshot=full_history, + ) + terminal_snapshot_persisted = True await _write_event("response.completed", { "type": "response.completed", "response": completed_env, }) - # Persist for future chaining / GET retrieval, mirroring - # the batch path behavior. - if store: - full_history = list(conversation_history) - full_history.append({"role": "user", "content": user_message}) - if isinstance(result, dict) and result.get("messages"): - full_history.extend(result["messages"]) - else: - full_history.append({"role": "assistant", "content": final_response_text}) - self._response_store.put(response_id, { - "response": completed_env, - "conversation_history": full_history, - "instructions": instructions, - "session_id": session_id, - }) - if conversation: - self._response_store.set_conversation(conversation, response_id) - except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError, OSError): + if store and not terminal_snapshot_persisted: + incomplete_text = "".join(final_text_parts) or final_response_text + incomplete_items: List[Dict[str, Any]] = list(emitted_items) + if incomplete_text: + incomplete_items.append({ + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": incomplete_text}], + }) + incomplete_env = _envelope("incomplete") + incomplete_env["output"] = incomplete_items + incomplete_env["usage"] = { + "input_tokens": usage.get("input_tokens", 0), + "output_tokens": usage.get("output_tokens", 0), + "total_tokens": usage.get("total_tokens", 0), + } + incomplete_history = list(conversation_history) + incomplete_history.append({"role": "user", "content": user_message}) + if incomplete_text: + incomplete_history.append({"role": "assistant", "content": incomplete_text}) + _persist_response_snapshot( + incomplete_env, + conversation_history_snapshot=incomplete_history, + ) # Client disconnected — interrupt the agent so it stops # making upstream LLM calls, then cancel the task. agent = agent_ref[0] if agent_ref else None