mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-11 03:31:55 +00:00
fix(gateway): avoid duplicated responses history
This commit is contained in:
parent
429e78589b
commit
8a96fa48c1
2 changed files with 283 additions and 20 deletions
|
|
@ -1888,12 +1888,12 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||||
"output_tokens": usage.get("output_tokens", 0),
|
"output_tokens": usage.get("output_tokens", 0),
|
||||||
"total_tokens": usage.get("total_tokens", 0),
|
"total_tokens": usage.get("total_tokens", 0),
|
||||||
}
|
}
|
||||||
full_history = list(conversation_history)
|
full_history = self._build_response_conversation_history(
|
||||||
full_history.append({"role": "user", "content": user_message})
|
conversation_history,
|
||||||
if isinstance(result, dict) and result.get("messages"):
|
user_message,
|
||||||
full_history.extend(result["messages"])
|
result,
|
||||||
else:
|
final_response_text,
|
||||||
full_history.append({"role": "assistant", "content": final_response_text})
|
)
|
||||||
_persist_response_snapshot(
|
_persist_response_snapshot(
|
||||||
completed_env,
|
completed_env,
|
||||||
conversation_history_snapshot=full_history,
|
conversation_history_snapshot=full_history,
|
||||||
|
|
@ -2192,17 +2192,22 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||||
|
|
||||||
# Build the full conversation history for storage
|
# Build the full conversation history for storage
|
||||||
# (includes tool calls from the agent run)
|
# (includes tool calls from the agent run)
|
||||||
full_history = list(conversation_history)
|
full_history = self._build_response_conversation_history(
|
||||||
full_history.append({"role": "user", "content": user_message})
|
conversation_history,
|
||||||
# Add agent's internal messages if available
|
user_message,
|
||||||
agent_messages = result.get("messages", [])
|
result,
|
||||||
if agent_messages:
|
final_response,
|
||||||
full_history.extend(agent_messages)
|
)
|
||||||
else:
|
|
||||||
full_history.append({"role": "assistant", "content": final_response})
|
|
||||||
|
|
||||||
# Build output items (includes tool calls + final message)
|
# Build output items from the current turn only. AIAgent returns a
|
||||||
output_items = self._extract_output_items(result)
|
# full transcript in result["messages"], while older/mocked paths may
|
||||||
|
# return only the current turn suffix.
|
||||||
|
output_start_index = self._response_messages_turn_start_index(
|
||||||
|
conversation_history,
|
||||||
|
user_message,
|
||||||
|
result,
|
||||||
|
)
|
||||||
|
output_items = self._extract_output_items(result, start_index=output_start_index)
|
||||||
|
|
||||||
response_data = {
|
response_data = {
|
||||||
"id": response_id,
|
"id": response_id,
|
||||||
|
|
@ -2494,17 +2499,70 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _extract_output_items(result: Dict[str, Any]) -> List[Dict[str, Any]]:
|
def _build_response_conversation_history(
|
||||||
"""
|
conversation_history: List[Dict[str, Any]],
|
||||||
Build the full output item array from the agent's messages.
|
user_message: Any,
|
||||||
|
result: Dict[str, Any],
|
||||||
|
final_response: Any,
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
"""Build the stored Responses transcript without duplicating history."""
|
||||||
|
prior = list(conversation_history)
|
||||||
|
current_user = {"role": "user", "content": user_message}
|
||||||
|
agent_messages = result.get("messages") if isinstance(result, dict) else None
|
||||||
|
|
||||||
Walks *result["messages"]* and emits:
|
if isinstance(agent_messages, list) and agent_messages:
|
||||||
|
turn_start = APIServerAdapter._response_messages_turn_start_index(
|
||||||
|
conversation_history,
|
||||||
|
user_message,
|
||||||
|
result,
|
||||||
|
)
|
||||||
|
if turn_start:
|
||||||
|
return list(agent_messages)
|
||||||
|
|
||||||
|
full_history = prior
|
||||||
|
full_history.append(current_user)
|
||||||
|
full_history.extend(agent_messages)
|
||||||
|
return full_history
|
||||||
|
|
||||||
|
full_history = prior
|
||||||
|
full_history.append(current_user)
|
||||||
|
full_history.append({"role": "assistant", "content": final_response})
|
||||||
|
return full_history
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _response_messages_turn_start_index(
|
||||||
|
conversation_history: List[Dict[str, Any]],
|
||||||
|
user_message: Any,
|
||||||
|
result: Dict[str, Any],
|
||||||
|
) -> int:
|
||||||
|
"""Detect transcript-shaped result["messages"] and return turn start."""
|
||||||
|
agent_messages = result.get("messages") if isinstance(result, dict) else None
|
||||||
|
if not isinstance(agent_messages, list) or not agent_messages:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
prior = list(conversation_history)
|
||||||
|
current_user = {"role": "user", "content": user_message}
|
||||||
|
expected_prefix = prior + [current_user]
|
||||||
|
if agent_messages[:len(expected_prefix)] == expected_prefix:
|
||||||
|
return len(expected_prefix)
|
||||||
|
if prior and agent_messages[:len(prior)] == prior:
|
||||||
|
return len(prior)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _extract_output_items(result: Dict[str, Any], start_index: int = 0) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
Build the output item array from the agent's messages.
|
||||||
|
|
||||||
|
Walks *result["messages"]* starting at *start_index* and emits:
|
||||||
- ``function_call`` items for each tool_call on assistant messages
|
- ``function_call`` items for each tool_call on assistant messages
|
||||||
- ``function_call_output`` items for each tool-role message
|
- ``function_call_output`` items for each tool-role message
|
||||||
- a final ``message`` item with the assistant's text reply
|
- a final ``message`` item with the assistant's text reply
|
||||||
"""
|
"""
|
||||||
items: List[Dict[str, Any]] = []
|
items: List[Dict[str, Any]] = []
|
||||||
messages = result.get("messages", [])
|
messages = result.get("messages", [])
|
||||||
|
if start_index > 0:
|
||||||
|
messages = messages[start_index:]
|
||||||
|
|
||||||
for msg in messages:
|
for msg in messages:
|
||||||
role = msg.get("role")
|
role = msg.get("role")
|
||||||
|
|
|
||||||
|
|
@ -1360,6 +1360,146 @@ class TestResponsesEndpoint:
|
||||||
assert len(call_kwargs["conversation_history"]) > 0
|
assert len(call_kwargs["conversation_history"]) > 0
|
||||||
assert call_kwargs["user_message"] == "Now add 1 more"
|
assert call_kwargs["user_message"] == "Now add 1 more"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_previous_response_id_stores_full_agent_transcript_once(self, adapter):
|
||||||
|
"""Chained Responses storage must not append result["messages"] twice."""
|
||||||
|
first_history = [
|
||||||
|
{"role": "user", "content": "What is 1+1?"},
|
||||||
|
{"role": "assistant", "content": "2"},
|
||||||
|
]
|
||||||
|
|
||||||
|
app = _create_app(adapter)
|
||||||
|
async with TestClient(TestServer(app)) as cli:
|
||||||
|
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
|
||||||
|
mock_run.return_value = (
|
||||||
|
{
|
||||||
|
"final_response": "2",
|
||||||
|
"messages": list(first_history),
|
||||||
|
"api_calls": 1,
|
||||||
|
},
|
||||||
|
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
|
||||||
|
)
|
||||||
|
resp1 = await cli.post(
|
||||||
|
"/v1/responses",
|
||||||
|
json={"model": "hermes-agent", "input": "What is 1+1?"},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp1.status == 200
|
||||||
|
resp1_data = await resp1.json()
|
||||||
|
stored_first = adapter._response_store.get(resp1_data["id"])
|
||||||
|
assert stored_first["conversation_history"] == first_history
|
||||||
|
|
||||||
|
second_history = first_history + [
|
||||||
|
{"role": "user", "content": "Now add 1 more"},
|
||||||
|
{"role": "assistant", "content": "3"},
|
||||||
|
]
|
||||||
|
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
|
||||||
|
mock_run.return_value = (
|
||||||
|
{
|
||||||
|
"final_response": "3",
|
||||||
|
"messages": list(second_history),
|
||||||
|
"api_calls": 1,
|
||||||
|
},
|
||||||
|
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
|
||||||
|
)
|
||||||
|
resp2 = await cli.post(
|
||||||
|
"/v1/responses",
|
||||||
|
json={
|
||||||
|
"model": "hermes-agent",
|
||||||
|
"input": "Now add 1 more",
|
||||||
|
"previous_response_id": resp1_data["id"],
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp2.status == 200
|
||||||
|
resp2_data = await resp2.json()
|
||||||
|
stored_second = adapter._response_store.get(resp2_data["id"])
|
||||||
|
stored_history = stored_second["conversation_history"]
|
||||||
|
assert stored_history == second_history
|
||||||
|
assert stored_history.count(first_history[0]) == 1
|
||||||
|
assert stored_history.count({"role": "user", "content": "Now add 1 more"}) == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_previous_response_id_outputs_only_current_turn_items(self, adapter):
|
||||||
|
"""Response output must not replay previous tool artifacts."""
|
||||||
|
prior_history = [
|
||||||
|
{"role": "user", "content": "Read old file"},
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"tool_calls": [
|
||||||
|
{
|
||||||
|
"id": "call_old",
|
||||||
|
"function": {
|
||||||
|
"name": "read_file",
|
||||||
|
"arguments": '{"path":"old.txt"}',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"role": "tool",
|
||||||
|
"tool_call_id": "call_old",
|
||||||
|
"content": '{"content":"old"}',
|
||||||
|
},
|
||||||
|
{"role": "assistant", "content": "old"},
|
||||||
|
]
|
||||||
|
adapter._response_store.put(
|
||||||
|
"resp_prev",
|
||||||
|
{
|
||||||
|
"response": {"id": "resp_prev", "status": "completed"},
|
||||||
|
"conversation_history": list(prior_history),
|
||||||
|
"session_id": "api-test-session",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
full_agent_transcript = prior_history + [
|
||||||
|
{"role": "user", "content": "Read new file"},
|
||||||
|
{
|
||||||
|
"role": "assistant",
|
||||||
|
"tool_calls": [
|
||||||
|
{
|
||||||
|
"id": "call_new",
|
||||||
|
"function": {
|
||||||
|
"name": "read_file",
|
||||||
|
"arguments": '{"path":"new.txt"}',
|
||||||
|
},
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"role": "tool",
|
||||||
|
"tool_call_id": "call_new",
|
||||||
|
"content": '{"content":"new"}',
|
||||||
|
},
|
||||||
|
{"role": "assistant", "content": "new"},
|
||||||
|
]
|
||||||
|
|
||||||
|
app = _create_app(adapter)
|
||||||
|
async with TestClient(TestServer(app)) as cli:
|
||||||
|
with patch.object(adapter, "_run_agent", new_callable=AsyncMock) as mock_run:
|
||||||
|
mock_run.return_value = (
|
||||||
|
{
|
||||||
|
"final_response": "new",
|
||||||
|
"messages": list(full_agent_transcript),
|
||||||
|
"api_calls": 1,
|
||||||
|
},
|
||||||
|
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
|
||||||
|
)
|
||||||
|
resp = await cli.post(
|
||||||
|
"/v1/responses",
|
||||||
|
json={
|
||||||
|
"model": "hermes-agent",
|
||||||
|
"input": "Read new file",
|
||||||
|
"previous_response_id": "resp_prev",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert resp.status == 200
|
||||||
|
data = await resp.json()
|
||||||
|
|
||||||
|
output_json = json.dumps(data["output"])
|
||||||
|
assert "call_new" in output_json
|
||||||
|
assert "call_old" not in output_json
|
||||||
|
assert "old.txt" not in output_json
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_previous_response_id_preserves_session(self, adapter):
|
async def test_previous_response_id_preserves_session(self, adapter):
|
||||||
"""Chained responses via previous_response_id reuse the same session_id."""
|
"""Chained responses via previous_response_id reuse the same session_id."""
|
||||||
|
|
@ -1627,6 +1767,71 @@ class TestResponsesStreaming:
|
||||||
assert data["status"] == "completed"
|
assert data["status"] == "completed"
|
||||||
assert data["output"][-1]["content"][0]["text"] == "Stored response"
|
assert data["output"][-1]["content"][0]["text"] == "Stored response"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_streamed_previous_response_id_stores_full_agent_transcript_once(self, adapter):
|
||||||
|
prior_history = [
|
||||||
|
{"role": "user", "content": "What is 1+1?"},
|
||||||
|
{"role": "assistant", "content": "2"},
|
||||||
|
]
|
||||||
|
adapter._response_store.put(
|
||||||
|
"resp_prev",
|
||||||
|
{
|
||||||
|
"response": {"id": "resp_prev", "status": "completed"},
|
||||||
|
"conversation_history": list(prior_history),
|
||||||
|
"session_id": "api-test-session",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
expected_history = prior_history + [
|
||||||
|
{"role": "user", "content": "Now add 1 more"},
|
||||||
|
{"role": "assistant", "content": "3"},
|
||||||
|
]
|
||||||
|
|
||||||
|
app = _create_app(adapter)
|
||||||
|
async with TestClient(TestServer(app)) as cli:
|
||||||
|
async def _mock_run_agent(**kwargs):
|
||||||
|
cb = kwargs.get("stream_delta_callback")
|
||||||
|
if cb:
|
||||||
|
cb("3")
|
||||||
|
return (
|
||||||
|
{
|
||||||
|
"final_response": "3",
|
||||||
|
"messages": list(expected_history),
|
||||||
|
"api_calls": 1,
|
||||||
|
},
|
||||||
|
{"input_tokens": 1, "output_tokens": 1, "total_tokens": 2},
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch.object(adapter, "_run_agent", side_effect=_mock_run_agent):
|
||||||
|
resp = await cli.post(
|
||||||
|
"/v1/responses",
|
||||||
|
json={
|
||||||
|
"model": "hermes-agent",
|
||||||
|
"input": "Now add 1 more",
|
||||||
|
"previous_response_id": "resp_prev",
|
||||||
|
"stream": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
body = await resp.text()
|
||||||
|
|
||||||
|
assert resp.status == 200
|
||||||
|
response_id = None
|
||||||
|
for line in body.splitlines():
|
||||||
|
if line.startswith("data: "):
|
||||||
|
try:
|
||||||
|
payload = json.loads(line[len("data: "):])
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
if payload.get("type") == "response.completed":
|
||||||
|
response_id = payload["response"]["id"]
|
||||||
|
break
|
||||||
|
|
||||||
|
assert response_id
|
||||||
|
stored_history = adapter._response_store.get(response_id)["conversation_history"]
|
||||||
|
assert stored_history == expected_history
|
||||||
|
assert stored_history.count(prior_history[0]) == 1
|
||||||
|
assert stored_history.count({"role": "user", "content": "Now add 1 more"}) == 1
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_stream_cancelled_persists_incomplete_snapshot(self, adapter):
|
async def test_stream_cancelled_persists_incomplete_snapshot(self, adapter):
|
||||||
"""Server-side asyncio.CancelledError (shutdown, request timeout) must
|
"""Server-side asyncio.CancelledError (shutdown, request timeout) must
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue