From 8a96fa48c10d7c06db07b70d53b2b489e9add2a3 Mon Sep 17 00:00:00 2001 From: thelumiereguy Date: Sun, 3 May 2026 00:17:49 +0200 Subject: [PATCH] fix(gateway): avoid duplicated responses history --- gateway/platforms/api_server.py | 98 ++++++++++++--- tests/gateway/test_api_server.py | 205 +++++++++++++++++++++++++++++++ 2 files changed, 283 insertions(+), 20 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index ae77100f6a..0b404af812 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -1888,12 +1888,12 @@ class APIServerAdapter(BasePlatformAdapter): "output_tokens": usage.get("output_tokens", 0), "total_tokens": usage.get("total_tokens", 0), } - full_history = list(conversation_history) - full_history.append({"role": "user", "content": user_message}) - if isinstance(result, dict) and result.get("messages"): - full_history.extend(result["messages"]) - else: - full_history.append({"role": "assistant", "content": final_response_text}) + full_history = self._build_response_conversation_history( + conversation_history, + user_message, + result, + final_response_text, + ) _persist_response_snapshot( completed_env, conversation_history_snapshot=full_history, @@ -2192,17 +2192,22 @@ class APIServerAdapter(BasePlatformAdapter): # Build the full conversation history for storage # (includes tool calls from the agent run) - full_history = list(conversation_history) - full_history.append({"role": "user", "content": user_message}) - # Add agent's internal messages if available - agent_messages = result.get("messages", []) - if agent_messages: - full_history.extend(agent_messages) - else: - full_history.append({"role": "assistant", "content": final_response}) + full_history = self._build_response_conversation_history( + conversation_history, + user_message, + result, + final_response, + ) - # Build output items (includes tool calls + final message) - output_items = self._extract_output_items(result) + # Build output items from the current turn only. AIAgent returns a + # full transcript in result["messages"], while older/mocked paths may + # return only the current turn suffix. + output_start_index = self._response_messages_turn_start_index( + conversation_history, + user_message, + result, + ) + output_items = self._extract_output_items(result, start_index=output_start_index) response_data = { "id": response_id, @@ -2494,17 +2499,70 @@ class APIServerAdapter(BasePlatformAdapter): # ------------------------------------------------------------------ @staticmethod - def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]: - """ - Build the full output item array from the agent's messages. + def _build_response_conversation_history( + conversation_history: List[Dict[str, Any]], + user_message: Any, + result: Dict[str, Any], + final_response: Any, + ) -> List[Dict[str, Any]]: + """Build the stored Responses transcript without duplicating history.""" + prior = list(conversation_history) + current_user = {"role": "user", "content": user_message} + agent_messages = result.get("messages") if isinstance(result, dict) else None - Walks *result["messages"]* and emits: + if isinstance(agent_messages, list) and agent_messages: + turn_start = APIServerAdapter._response_messages_turn_start_index( + conversation_history, + user_message, + result, + ) + if turn_start: + return list(agent_messages) + + full_history = prior + full_history.append(current_user) + full_history.extend(agent_messages) + return full_history + + full_history = prior + full_history.append(current_user) + full_history.append({"role": "assistant", "content": final_response}) + return full_history + + @staticmethod + def _response_messages_turn_start_index( + conversation_history: List[Dict[str, Any]], + user_message: Any, + result: Dict[str, Any], + ) -> int: + """Detect transcript-shaped result["messages"] and return turn start.""" + agent_messages = result.get("messages") if isinstance(result, dict) else None + if not isinstance(agent_messages, list) or not agent_messages: + return 0 + + prior = list(conversation_history) + current_user = {"role": "user", "content": user_message} + expected_prefix = prior + [current_user] + if agent_messages[:len(expected_prefix)] == expected_prefix: + return len(expected_prefix) + if prior and agent_messages[:len(prior)] == prior: + return len(prior) + return 0 + + @staticmethod + def _extract_output_items(result: Dict[str, Any], start_index: int = 0) -> List[Dict[str, Any]]: + """ + Build the output item array from the agent's messages. + + Walks *result["messages"]* starting at *start_index* and emits: - ``function_call`` items for each tool_call on assistant messages - ``function_call_output`` items for each tool-role message - a final ``message`` item with the assistant's text reply """ items: List[Dict[str, Any]] = [] messages = result.get("messages", []) + if start_index > 0: + messages = messages[start_index:] for msg in messages: role = msg.get("role") diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 2bf539041e..150ae11261 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -1360,6 +1360,146 @@ class TestResponsesEndpoint: assert len(call_kwargs["conversation_history"]) > 0 assert call_kwargs["user_message"] == "Now add 1 more" + @pytest.mark.asyncio + async def test_previous_response_id_stores_full_agent_transcript_once(self, adapter): + """Chained Responses storage must not append result["messages"] twice.""" + first_history = [ + {"role": "user", "content": "What is 1+1?"}, + {"role": "assistant", "content": "2"}, + ] + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = ( + { + "final_response": "2", + "messages": list(first_history), + "api_calls": 1, + }, + {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}, + ) + resp1 = await cli.post( + "/v1/responses", + json={"model": "hermes-agent", "input": "What is 1+1?"}, + ) + + assert resp1.status == 200 + resp1_data = await resp1.json() + stored_first = adapter._response_store.get(resp1_data["id"]) + assert stored_first["conversation_history"] == first_history + + second_history = first_history + [ + {"role": "user", "content": "Now add 1 more"}, + {"role": "assistant", "content": "3"}, + ] + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = ( + { + "final_response": "3", + "messages": list(second_history), + "api_calls": 1, + }, + {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}, + ) + resp2 = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Now add 1 more", + "previous_response_id": resp1_data["id"], + }, + ) + + assert resp2.status == 200 + resp2_data = await resp2.json() + stored_second = adapter._response_store.get(resp2_data["id"]) + stored_history = stored_second["conversation_history"] + assert stored_history == second_history + assert stored_history.count(first_history[0]) == 1 + assert stored_history.count({"role": "user", "content": "Now add 1 more"}) == 1 + + @pytest.mark.asyncio + async def test_previous_response_id_outputs_only_current_turn_items(self, adapter): + """Response output must not replay previous tool artifacts.""" + prior_history = [ + {"role": "user", "content": "Read old file"}, + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_old", + "function": { + "name": "read_file", + "arguments": '{"path":"old.txt"}', + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_old", + "content": '{"content":"old"}', + }, + {"role": "assistant", "content": "old"}, + ] + adapter._response_store.put( + "resp_prev", + { + "response": {"id": "resp_prev", "status": "completed"}, + "conversation_history": list(prior_history), + "session_id": "api-test-session", + }, + ) + full_agent_transcript = prior_history + [ + {"role": "user", "content": "Read new file"}, + { + "role": "assistant", + "tool_calls": [ + { + "id": "call_new", + "function": { + "name": "read_file", + "arguments": '{"path":"new.txt"}', + }, + } + ], + }, + { + "role": "tool", + "tool_call_id": "call_new", + "content": '{"content":"new"}', + }, + {"role": "assistant", "content": "new"}, + ] + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run: + mock_run.return_value = ( + { + "final_response": "new", + "messages": list(full_agent_transcript), + "api_calls": 1, + }, + {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}, + ) + resp = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Read new file", + "previous_response_id": "resp_prev", + }, + ) + assert resp.status == 200 + data = await resp.json() + + output_json = json.dumps(data["output"]) + assert "call_new" in output_json + assert "call_old" not in output_json + assert "old.txt" not in output_json + @pytest.mark.asyncio async def test_previous_response_id_preserves_session(self, adapter): """Chained responses via previous_response_id reuse the same session_id.""" @@ -1627,6 +1767,71 @@ class TestResponsesStreaming: assert data["status"] == "completed" assert data["output"][-1]["content"][0]["text"] == "Stored response" + @pytest.mark.asyncio + async def test_streamed_previous_response_id_stores_full_agent_transcript_once(self, adapter): + prior_history = [ + {"role": "user", "content": "What is 1+1?"}, + {"role": "assistant", "content": "2"}, + ] + adapter._response_store.put( + "resp_prev", + { + "response": {"id": "resp_prev", "status": "completed"}, + "conversation_history": list(prior_history), + "session_id": "api-test-session", + }, + ) + + expected_history = prior_history + [ + {"role": "user", "content": "Now add 1 more"}, + {"role": "assistant", "content": "3"}, + ] + + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + async def _mock_run_agent(**kwargs): + cb = kwargs.get("stream_delta_callback") + if cb: + cb("3") + return ( + { + "final_response": "3", + "messages": list(expected_history), + "api_calls": 1, + }, + {"input_tokens": 1, "output_tokens": 1, "total_tokens": 2}, + ) + + with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent): + resp = await cli.post( + "/v1/responses", + json={ + "model": "hermes-agent", + "input": "Now add 1 more", + "previous_response_id": "resp_prev", + "stream": True, + }, + ) + body = await resp.text() + + assert resp.status == 200 + response_id = None + for line in body.splitlines(): + if line.startswith("data: "): + try: + payload = json.loads(line[len("data: "):]) + except json.JSONDecodeError: + continue + if payload.get("type") == "response.completed": + response_id = payload["response"]["id"] + break + + assert response_id + stored_history = adapter._response_store.get(response_id)["conversation_history"] + assert stored_history == expected_history + assert stored_history.count(prior_history[0]) == 1 + assert stored_history.count({"role": "user", "content": "Now add 1 more"}) == 1 + @pytest.mark.asyncio async def test_stream_cancelled_persists_incomplete_snapshot(self, adapter): """Server-side asyncio.CancelledError (shutdown, request timeout) must