mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
fix: backfill codex stream output from output_item.done events (#5689)
Salvages the core fix from PR #5673 (egerev) onto current main. The chatgpt.com/backend-api/codex endpoint streams valid output items via response.output_item.done events, but the OpenAI SDK's get_final_response() returns an empty output list. This caused every Codex response to be rejected as invalid. Fix: collect output_item.done events during streaming and backfill response.output when get_final_response() returns empty. Falls back to synthesizing from text deltas when no done events were received. Also moves the synthesis logic from the validation loop (too late, from #5681) into _run_codex_stream() (before the response leaves the streaming function), and simplifies the validation to just log diagnostics since recovery now happens upstream. Co-authored-by: Egor <egerev@users.noreply.github.com>
This commit is contained in:
parent
e5aaa38ca7
commit
0e336b0e71
1 changed files with 44 additions and 40 deletions
84
run_agent.py
84
run_agent.py
|
|
@ -3873,6 +3873,7 @@ class AIAgent:
|
|||
# response.incomplete instead of response.completed).
|
||||
self._codex_streamed_text_parts: list = []
|
||||
for attempt in range(max_stream_retries + 1):
|
||||
collected_output_items: list = []
|
||||
try:
|
||||
with active_client.responses.stream(**api_kwargs) as stream:
|
||||
for event in stream:
|
||||
|
|
@ -3882,6 +3883,8 @@ class AIAgent:
|
|||
# Fire callbacks on text content deltas (suppress during tool calls)
|
||||
if "output_text.delta" in event_type or event_type == "response.output_text.delta":
|
||||
delta_text = getattr(event, "delta", "")
|
||||
if delta_text:
|
||||
self._codex_streamed_text_parts.append(delta_text)
|
||||
if delta_text and not has_tool_calls:
|
||||
if not first_delta_fired:
|
||||
first_delta_fired = True
|
||||
|
|
@ -3891,7 +3894,6 @@ class AIAgent:
|
|||
except Exception:
|
||||
pass
|
||||
self._fire_stream_delta(delta_text)
|
||||
self._codex_streamed_text_parts.append(delta_text)
|
||||
# Track tool calls to suppress text streaming
|
||||
elif "function_call" in event_type:
|
||||
has_tool_calls = True
|
||||
|
|
@ -3900,6 +3902,14 @@ class AIAgent:
|
|||
reasoning_text = getattr(event, "delta", "")
|
||||
if reasoning_text:
|
||||
self._fire_reasoning_delta(reasoning_text)
|
||||
# Collect completed output items — some backends
|
||||
# (chatgpt.com/backend-api/codex) stream valid items
|
||||
# via response.output_item.done but the SDK's
|
||||
# get_final_response() returns an empty output list.
|
||||
elif event_type == "response.output_item.done":
|
||||
done_item = getattr(event, "item", None)
|
||||
if done_item is not None:
|
||||
collected_output_items.append(done_item)
|
||||
# Log non-completed terminal events for diagnostics
|
||||
elif event_type in ("response.incomplete", "response.failed"):
|
||||
resp_obj = getattr(event, "response", None)
|
||||
|
|
@ -3912,7 +3922,31 @@ class AIAgent:
|
|||
sum(len(p) for p in self._codex_streamed_text_parts),
|
||||
self._client_log_context(),
|
||||
)
|
||||
return stream.get_final_response()
|
||||
final_response = stream.get_final_response()
|
||||
# PATCH: ChatGPT Codex backend streams valid output items
|
||||
# but get_final_response() can return an empty output list.
|
||||
# Backfill from collected items or synthesize from deltas.
|
||||
_out = getattr(final_response, "output", None)
|
||||
if isinstance(_out, list) and not _out:
|
||||
if collected_output_items:
|
||||
final_response.output = list(collected_output_items)
|
||||
logger.debug(
|
||||
"Codex stream: backfilled %d output items from stream events",
|
||||
len(collected_output_items),
|
||||
)
|
||||
elif self._codex_streamed_text_parts and not has_tool_calls:
|
||||
assembled = "".join(self._codex_streamed_text_parts)
|
||||
final_response.output = [SimpleNamespace(
|
||||
type="message",
|
||||
role="assistant",
|
||||
status="completed",
|
||||
content=[SimpleNamespace(type="output_text", text=assembled)],
|
||||
)]
|
||||
logger.debug(
|
||||
"Codex stream: synthesized output from %d text deltas (%d chars)",
|
||||
len(self._codex_streamed_text_parts), len(assembled),
|
||||
)
|
||||
return final_response
|
||||
except (_httpx.RemoteProtocolError, _httpx.ReadTimeout, _httpx.ConnectError, ConnectionError) as exc:
|
||||
if attempt < max_stream_retries:
|
||||
logger.debug(
|
||||
|
|
@ -7383,50 +7417,20 @@ class AIAgent:
|
|||
response_invalid = True
|
||||
error_details.append("response.output is not a list")
|
||||
elif len(output_items) == 0:
|
||||
# Log diagnostics for empty output
|
||||
# If we reach here, _run_codex_stream's backfill
|
||||
# from output_item.done events and text-delta
|
||||
# synthesis both failed to populate output.
|
||||
_resp_status = getattr(response, "status", None)
|
||||
_resp_incomplete = getattr(response, "incomplete_details", None)
|
||||
_streamed_parts = getattr(self, "_codex_streamed_text_parts", [])
|
||||
_streamed_text = "".join(_streamed_parts).strip() if _streamed_parts else ""
|
||||
logging.warning(
|
||||
"Codex response.output is empty "
|
||||
"(status=%s, incomplete_details=%s, streamed_chars=%d, "
|
||||
"output_text=%r, model=%s). %s",
|
||||
_resp_status, _resp_incomplete, len(_streamed_text),
|
||||
getattr(response, "output_text", None),
|
||||
"Codex response.output is empty after stream backfill "
|
||||
"(status=%s, incomplete_details=%s, model=%s). %s",
|
||||
_resp_status, _resp_incomplete,
|
||||
getattr(response, "model", None),
|
||||
f"api_mode={self.api_mode} provider={self.provider}",
|
||||
)
|
||||
# Recovery: if we streamed text but the final response
|
||||
# lost it (e.g. response.incomplete from chatgpt backend-api),
|
||||
# synthesize a minimal response so the user gets the answer
|
||||
# the model already delivered.
|
||||
if _streamed_text:
|
||||
logging.info(
|
||||
"Recovering %d chars of streamed text as response "
|
||||
"(status was %s).", len(_streamed_text), _resp_status,
|
||||
)
|
||||
response = SimpleNamespace(
|
||||
output=[SimpleNamespace(
|
||||
type="message",
|
||||
role="assistant",
|
||||
status="completed",
|
||||
content=[SimpleNamespace(
|
||||
type="output_text",
|
||||
text=_streamed_text,
|
||||
)],
|
||||
)],
|
||||
status=_resp_status or "completed",
|
||||
model=getattr(response, "model", self.model),
|
||||
usage=getattr(response, "usage", None),
|
||||
id=getattr(response, "id", None),
|
||||
output_text=_streamed_text,
|
||||
)
|
||||
# Clear the accumulated parts so we don't double-recover
|
||||
self._codex_streamed_text_parts = []
|
||||
else:
|
||||
response_invalid = True
|
||||
error_details.append("response.output is empty")
|
||||
response_invalid = True
|
||||
error_details.append("response.output is empty")
|
||||
elif self.api_mode == "anthropic_messages":
|
||||
content_blocks = getattr(response, "content", None) if response is not None else None
|
||||
if response is None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue