feat: add ResponsesApiTransport + wire all Codex transport paths

Add ResponsesApiTransport wrapping codex_responses_adapter.py behind the
ProviderTransport ABC. Auto-registered via _discover_transports().

Wire ALL Codex transport methods to production paths in run_agent.py:
- build_kwargs: main _build_api_kwargs codex branch (50 lines extracted)
- normalize_response: main loop + flush + summary + retry (4 sites)
- convert_tools: memory flush tool override
- convert_messages: called internally via build_kwargs
- validate_response: response validation gate
- preflight_kwargs: request sanitization (2 sites)

Remove 7 dead legacy wrappers from AIAgent (_responses_tools,
_chat_messages_to_responses_input, _normalize_codex_response,
_preflight_codex_api_kwargs, _preflight_codex_input_items,
_extract_responses_message_text, _extract_responses_reasoning_text).
Keep 3 ID manipulation methods still used by _build_assistant_message.

Update 18 test call sites across 3 test files to call adapter functions
directly instead of through deleted AIAgent wrappers.

24 new tests. 343 codex/responses/transport tests pass (0 failures).

PR 4 of the provider transport refactor.
This commit is contained in:
kshitijk4poor 2026-04-21 14:24:41 +05:30 committed by Teknium
parent 09dd5eb6a5
commit c832ebd67c
7 changed files with 589 additions and 169 deletions

View file

@ -37,3 +37,7 @@ def _discover_transports() -> None:
import agent.transports.anthropic # noqa: F401
except ImportError:
pass
try:
import agent.transports.codex # noqa: F401
except ImportError:
pass

217
agent/transports/codex.py Normal file
View file

@ -0,0 +1,217 @@
"""OpenAI Responses API (Codex) transport.
Delegates to the existing adapter functions in agent/codex_responses_adapter.py.
This transport owns format conversion and normalization NOT client lifecycle,
streaming, or the _run_codex_stream() call path.
"""
from typing import Any, Dict, List, Optional
from agent.transports.base import ProviderTransport
from agent.transports.types import NormalizedResponse, ToolCall, Usage
class ResponsesApiTransport(ProviderTransport):
"""Transport for api_mode='codex_responses'.
Wraps the functions extracted into codex_responses_adapter.py (PR 1).
"""
@property
def api_mode(self) -> str:
return "codex_responses"
def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any:
"""Convert OpenAI chat messages to Responses API input items."""
from agent.codex_responses_adapter import _chat_messages_to_responses_input
return _chat_messages_to_responses_input(messages)
def convert_tools(self, tools: List[Dict[str, Any]]) -> Any:
"""Convert OpenAI tool schemas to Responses API function definitions."""
from agent.codex_responses_adapter import _responses_tools
return _responses_tools(tools)
def build_kwargs(
self,
model: str,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
**params,
) -> Dict[str, Any]:
"""Build Responses API kwargs.
Calls convert_messages and convert_tools internally.
params:
instructions: str system prompt (extracted from messages[0] if not given)
reasoning_config: dict | None {effort, enabled}
session_id: str | None used for prompt_cache_key + xAI conv header
max_tokens: int | None max_output_tokens
request_overrides: dict | None extra kwargs merged in
provider: str | None provider name for backend-specific logic
base_url: str | None endpoint URL
base_url_hostname: str | None hostname for backend detection
is_github_responses: bool Copilot/GitHub models backend
is_codex_backend: bool chatgpt.com/backend-api/codex
is_xai_responses: bool xAI/Grok backend
github_reasoning_extra: dict | None Copilot reasoning params
"""
from agent.codex_responses_adapter import (
_chat_messages_to_responses_input,
_responses_tools,
)
from run_agent import DEFAULT_AGENT_IDENTITY
instructions = params.get("instructions", "")
payload_messages = messages
if not instructions:
if messages and messages[0].get("role") == "system":
instructions = str(messages[0].get("content") or "").strip()
payload_messages = messages[1:]
if not instructions:
instructions = DEFAULT_AGENT_IDENTITY
is_github_responses = params.get("is_github_responses", False)
is_codex_backend = params.get("is_codex_backend", False)
is_xai_responses = params.get("is_xai_responses", False)
# Resolve reasoning effort
reasoning_effort = "medium"
reasoning_enabled = True
reasoning_config = params.get("reasoning_config")
if reasoning_config and isinstance(reasoning_config, dict):
if reasoning_config.get("enabled") is False:
reasoning_enabled = False
elif reasoning_config.get("effort"):
reasoning_effort = reasoning_config["effort"]
_effort_clamp = {"minimal": "low"}
reasoning_effort = _effort_clamp.get(reasoning_effort, reasoning_effort)
kwargs = {
"model": model,
"instructions": instructions,
"input": _chat_messages_to_responses_input(payload_messages),
"tools": _responses_tools(tools),
"tool_choice": "auto",
"parallel_tool_calls": True,
"store": False,
}
session_id = params.get("session_id")
if not is_github_responses and session_id:
kwargs["prompt_cache_key"] = session_id
if reasoning_enabled and is_xai_responses:
kwargs["include"] = ["reasoning.encrypted_content"]
elif reasoning_enabled:
if is_github_responses:
github_reasoning = params.get("github_reasoning_extra")
if github_reasoning is not None:
kwargs["reasoning"] = github_reasoning
else:
kwargs["reasoning"] = {"effort": reasoning_effort, "summary": "auto"}
kwargs["include"] = ["reasoning.encrypted_content"]
elif not is_github_responses and not is_xai_responses:
kwargs["include"] = []
request_overrides = params.get("request_overrides")
if request_overrides:
kwargs.update(request_overrides)
max_tokens = params.get("max_tokens")
if max_tokens is not None and not is_codex_backend:
kwargs["max_output_tokens"] = max_tokens
if is_xai_responses and session_id:
kwargs["extra_headers"] = {"x-grok-conv-id": session_id}
return kwargs
def normalize_response(self, response: Any, **kwargs) -> NormalizedResponse:
"""Normalize Codex Responses API response to NormalizedResponse."""
from agent.codex_responses_adapter import (
_normalize_codex_response,
_extract_responses_message_text,
_extract_responses_reasoning_text,
)
# _normalize_codex_response returns (SimpleNamespace, finish_reason_str)
msg, finish_reason = _normalize_codex_response(response)
tool_calls = None
if msg and msg.tool_calls:
tool_calls = []
for tc in msg.tool_calls:
provider_data = {}
if hasattr(tc, "call_id") and tc.call_id:
provider_data["call_id"] = tc.call_id
if hasattr(tc, "response_item_id") and tc.response_item_id:
provider_data["response_item_id"] = tc.response_item_id
tool_calls.append(ToolCall(
id=tc.id if hasattr(tc, "id") else (tc.function.name if hasattr(tc, "function") else None),
name=tc.function.name if hasattr(tc, "function") else getattr(tc, "name", ""),
arguments=tc.function.arguments if hasattr(tc, "function") else getattr(tc, "arguments", "{}"),
provider_data=provider_data or None,
))
# Extract reasoning items for provider_data
provider_data = {}
if msg and hasattr(msg, "codex_reasoning_items") and msg.codex_reasoning_items:
provider_data["codex_reasoning_items"] = msg.codex_reasoning_items
if msg and hasattr(msg, "reasoning_details") and msg.reasoning_details:
provider_data["reasoning_details"] = msg.reasoning_details
return NormalizedResponse(
content=msg.content if msg else None,
tool_calls=tool_calls,
finish_reason=finish_reason or "stop",
reasoning=msg.reasoning if msg and hasattr(msg, "reasoning") else None,
usage=None, # Codex usage is extracted separately in normalize_usage()
provider_data=provider_data or None,
)
def validate_response(self, response: Any) -> bool:
"""Check Codex Responses API response has valid output structure.
Returns True only if response.output is a non-empty list.
Does NOT check output_text fallback the caller handles that
with diagnostic logging for stream backfill recovery.
"""
if response is None:
return False
output = getattr(response, "output", None)
if not isinstance(output, list) or not output:
return False
return True
def preflight_kwargs(self, api_kwargs: Any, *, allow_stream: bool = False) -> dict:
"""Validate and sanitize Codex API kwargs before the call.
Normalizes input items, strips unsupported fields, validates structure.
"""
from agent.codex_responses_adapter import _preflight_codex_api_kwargs
return _preflight_codex_api_kwargs(api_kwargs, allow_stream=allow_stream)
def map_finish_reason(self, raw_reason: str) -> str:
"""Map Codex response.status to OpenAI finish_reason.
Codex uses response.status ('completed', 'incomplete') +
response.incomplete_details.reason for granular mapping.
This method handles the simple status string; the caller
should check incomplete_details separately for 'max_output_tokens'.
"""
_MAP = {
"completed": "stop",
"incomplete": "length",
"failed": "stop",
"cancelled": "stop",
}
return _MAP.get(raw_reason, "stop")
# Auto-register on import
from agent.transports import register_transport # noqa: E402
register_transport("codex_responses", ResponsesApiTransport)

View file

@ -4308,10 +4308,6 @@ class AIAgent:
if self._memory_store:
self._memory_store.load_from_disk()
def _responses_tools(self, tools: Optional[List[Dict[str, Any]]] = None) -> Optional[List[Dict[str, Any]]]:
"""Convert chat-completions tool schemas to Responses function-tool schemas."""
return _codex_responses_tools(tools if tools is not None else self.tools)
@staticmethod
def _deterministic_call_id(fn_name: str, arguments: str, index: int = 0) -> str:
"""Generate a deterministic call_id from tool call content.
@ -4335,33 +4331,6 @@ class AIAgent:
"""Build a valid Responses `function_call.id` (must start with `fc_`)."""
return _codex_derive_responses_function_call_id(call_id, response_item_id)
def _chat_messages_to_responses_input(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert internal chat-style messages to Responses input items."""
return _codex_chat_messages_to_responses_input(messages)
def _preflight_codex_input_items(self, raw_items: Any) -> List[Dict[str, Any]]:
return _codex_preflight_codex_input_items(raw_items)
def _preflight_codex_api_kwargs(
self,
api_kwargs: Any,
*,
allow_stream: bool = False,
) -> Dict[str, Any]:
return _codex_preflight_codex_api_kwargs(api_kwargs, allow_stream=allow_stream)
def _extract_responses_message_text(self, item: Any) -> str:
"""Extract assistant text from a Responses message output item."""
return _codex_extract_responses_message_text(item)
def _extract_responses_reasoning_text(self, item: Any) -> str:
"""Extract a compact reasoning text from a Responses reasoning item."""
return _codex_extract_responses_reasoning_text(item)
def _normalize_codex_response(self, response: Any) -> tuple[Any, str]:
"""Normalize a Responses API object to an assistant_message-like object."""
return _codex_normalize_codex_response(response)
def _thread_identity(self) -> str:
thread = threading.current_thread()
return f"{thread.name}:{thread.ident}"
@ -4854,7 +4823,7 @@ class AIAgent:
active_client = client or self._ensure_primary_openai_client(reason="codex_create_stream_fallback")
fallback_kwargs = dict(api_kwargs)
fallback_kwargs["stream"] = True
fallback_kwargs = self._preflight_codex_api_kwargs(fallback_kwargs, allow_stream=True)
fallback_kwargs = self._get_codex_transport().preflight_kwargs(fallback_kwargs, allow_stream=True)
stream_or_response = active_client.responses.create(**fallback_kwargs)
# Compatibility shim for mocks or providers that still return a concrete response.
@ -6596,6 +6565,15 @@ class AIAgent:
self._anthropic_transport = t
return t
def _get_codex_transport(self):
"""Return the cached ResponsesApiTransport instance (lazy singleton)."""
t = getattr(self, "_codex_transport", None)
if t is None:
from agent.transports import get_transport
t = get_transport("codex_responses")
self._codex_transport = t
return t
def _prepare_anthropic_messages_for_api(self, api_messages: list) -> list:
if not any(
isinstance(msg, dict) and self._content_has_image_parts(msg.get("content"))
@ -6752,14 +6730,7 @@ class AIAgent:
}
if self.api_mode == "codex_responses":
instructions = ""
payload_messages = api_messages
if api_messages and api_messages[0].get("role") == "system":
instructions = str(api_messages[0].get("content") or "").strip()
payload_messages = api_messages[1:]
if not instructions:
instructions = DEFAULT_AGENT_IDENTITY
_ct = self._get_codex_transport()
is_github_responses = (
base_url_host_matches(self.base_url, "models.github.ai")
or base_url_host_matches(self.base_url, "api.githubcopilot.com")
@ -6771,64 +6742,20 @@ class AIAgent:
and "/backend-api/codex" in self._base_url_lower
)
)
# Resolve reasoning effort: config > default (medium)
reasoning_effort = "medium"
reasoning_enabled = True
if self.reasoning_config and isinstance(self.reasoning_config, dict):
if self.reasoning_config.get("enabled") is False:
reasoning_enabled = False
elif self.reasoning_config.get("effort"):
reasoning_effort = self.reasoning_config["effort"]
# Clamp effort levels not supported by the Responses API model.
# GPT-5.4 supports none/low/medium/high/xhigh but not "minimal".
# "minimal" is valid on OpenRouter and GPT-5 but fails on 5.2/5.4.
_effort_clamp = {"minimal": "low"}
reasoning_effort = _effort_clamp.get(reasoning_effort, reasoning_effort)
kwargs = {
"model": self.model,
"instructions": instructions,
"input": self._chat_messages_to_responses_input(payload_messages),
"tools": self._responses_tools(),
"tool_choice": "auto",
"parallel_tool_calls": True,
"store": False,
}
if not is_github_responses:
kwargs["prompt_cache_key"] = self.session_id
is_xai_responses = self.provider == "xai" or self._base_url_hostname == "api.x.ai"
if reasoning_enabled and is_xai_responses:
# xAI reasons automatically — no effort param, just include encrypted content
kwargs["include"] = ["reasoning.encrypted_content"]
elif reasoning_enabled:
if is_github_responses:
# Copilot's Responses route advertises reasoning-effort support,
# but not OpenAI-specific prompt cache or encrypted reasoning
# fields. Keep the payload to the documented subset.
github_reasoning = self._github_models_reasoning_extra_body()
if github_reasoning is not None:
kwargs["reasoning"] = github_reasoning
else:
kwargs["reasoning"] = {"effort": reasoning_effort, "summary": "auto"}
kwargs["include"] = ["reasoning.encrypted_content"]
elif not is_github_responses and not is_xai_responses:
kwargs["include"] = []
if self.request_overrides:
kwargs.update(self.request_overrides)
if self.max_tokens is not None and not is_codex_backend:
kwargs["max_output_tokens"] = self.max_tokens
if is_xai_responses and getattr(self, "session_id", None):
kwargs["extra_headers"] = {"x-grok-conv-id": self.session_id}
return kwargs
return _ct.build_kwargs(
model=self.model,
messages=api_messages,
tools=self.tools,
reasoning_config=self.reasoning_config,
session_id=getattr(self, "session_id", None),
max_tokens=self.max_tokens,
request_overrides=self.request_overrides,
is_github_responses=is_github_responses,
is_codex_backend=is_codex_backend,
is_xai_responses=is_xai_responses,
github_reasoning_extra=self._github_models_reasoning_extra_body() if is_github_responses else None,
)
sanitized_messages = api_messages
needs_sanitization = False
@ -7438,7 +7365,7 @@ class AIAgent:
if not _aux_available and self.api_mode == "codex_responses":
# No auxiliary client -- use the Codex Responses path directly
codex_kwargs = self._build_api_kwargs(api_messages)
codex_kwargs["tools"] = self._responses_tools([memory_tool_def])
codex_kwargs["tools"] = self._get_codex_transport().convert_tools([memory_tool_def])
if _flush_temperature is not None:
codex_kwargs["temperature"] = _flush_temperature
else:
@ -7473,9 +7400,15 @@ class AIAgent:
# Extract tool calls from the response, handling all API formats
tool_calls = []
if self.api_mode == "codex_responses" and not _aux_available:
assistant_msg, _ = self._normalize_codex_response(response)
if assistant_msg and assistant_msg.tool_calls:
tool_calls = assistant_msg.tool_calls
_ct_flush = self._get_codex_transport()
_cnr_flush = _ct_flush.normalize_response(response)
if _cnr_flush and _cnr_flush.tool_calls:
tool_calls = [
SimpleNamespace(
id=tc.id, type="function",
function=SimpleNamespace(name=tc.name, arguments=tc.arguments),
) for tc in _cnr_flush.tool_calls
]
elif self.api_mode == "anthropic_messages" and not _aux_available:
_tfn = self._get_anthropic_transport()
_flush_nr = _tfn.normalize_response(response, strip_tool_prefix=self._is_anthropic_oauth)
@ -8519,8 +8452,9 @@ class AIAgent:
codex_kwargs = self._build_api_kwargs(api_messages)
codex_kwargs.pop("tools", None)
summary_response = self._run_codex_stream(codex_kwargs)
assistant_message, _ = self._normalize_codex_response(summary_response)
final_response = (assistant_message.content or "").strip() if assistant_message else ""
_ct_sum = self._get_codex_transport()
_cnr_sum = _ct_sum.normalize_response(summary_response)
final_response = (_cnr_sum.content or "").strip()
else:
summary_kwargs = {
"model": self.model,
@ -8577,8 +8511,9 @@ class AIAgent:
codex_kwargs = self._build_api_kwargs(api_messages)
codex_kwargs.pop("tools", None)
retry_response = self._run_codex_stream(codex_kwargs)
retry_msg, _ = self._normalize_codex_response(retry_response)
final_response = (retry_msg.content or "").strip() if retry_msg else ""
_ct_retry = self._get_codex_transport()
_cnr_retry = _ct_retry.normalize_response(retry_response)
final_response = (_cnr_retry.content or "").strip()
elif self.api_mode == "anthropic_messages":
_tretry = self._get_anthropic_transport()
_ant_kw2 = _tretry.build_kwargs(model=self.model, messages=api_messages, tools=None,
@ -9340,7 +9275,7 @@ class AIAgent:
if self._force_ascii_payload:
_sanitize_structure_non_ascii(api_kwargs)
if self.api_mode == "codex_responses":
api_kwargs = self._preflight_codex_api_kwargs(api_kwargs, allow_stream=False)
api_kwargs = self._get_codex_transport().preflight_kwargs(api_kwargs, allow_stream=False)
try:
from hermes_cli.plugins import invoke_hook as _invoke_hook
@ -9428,38 +9363,34 @@ class AIAgent:
response_invalid = False
error_details = []
if self.api_mode == "codex_responses":
output_items = getattr(response, "output", None) if response is not None else None
if response is None:
response_invalid = True
error_details.append("response is None")
elif not isinstance(output_items, list):
response_invalid = True
error_details.append("response.output is not a list")
elif not output_items:
# Stream backfill may have failed, but
# _normalize_codex_response can still recover
# from response.output_text. Only mark invalid
# when that fallback is also absent.
_out_text = getattr(response, "output_text", None)
_out_text_stripped = _out_text.strip() if isinstance(_out_text, str) else ""
if _out_text_stripped:
logger.debug(
"Codex response.output is empty but output_text is present "
"(%d chars); deferring to normalization.",
len(_out_text_stripped),
)
else:
_resp_status = getattr(response, "status", None)
_resp_incomplete = getattr(response, "incomplete_details", None)
logger.warning(
"Codex response.output is empty after stream backfill "
"(status=%s, incomplete_details=%s, model=%s). %s",
_resp_status, _resp_incomplete,
getattr(response, "model", None),
f"api_mode={self.api_mode} provider={self.provider}",
)
_ct_v = self._get_codex_transport()
if not _ct_v.validate_response(response):
if response is None:
response_invalid = True
error_details.append("response.output is empty")
error_details.append("response is None")
else:
# output_text fallback: stream backfill may have failed
# but normalize can still recover from output_text
_out_text = getattr(response, "output_text", None)
_out_text_stripped = _out_text.strip() if isinstance(_out_text, str) else ""
if _out_text_stripped:
logger.debug(
"Codex response.output is empty but output_text is present "
"(%d chars); deferring to normalization.",
len(_out_text_stripped),
)
else:
_resp_status = getattr(response, "status", None)
_resp_incomplete = getattr(response, "incomplete_details", None)
logger.warning(
"Codex response.output is empty after stream backfill "
"(status=%s, incomplete_details=%s, model=%s). %s",
_resp_status, _resp_incomplete,
getattr(response, "model", None),
f"api_mode={self.api_mode} provider={self.provider}",
)
response_invalid = True
error_details.append("response.output is empty")
elif self.api_mode == "anthropic_messages":
_tv = self._get_anthropic_transport()
if not _tv.validate_response(response):
@ -10885,7 +10816,40 @@ class AIAgent:
try:
if self.api_mode == "codex_responses":
assistant_message, finish_reason = self._normalize_codex_response(response)
_ct = self._get_codex_transport()
_cnr = _ct.normalize_response(response)
# Back-compat shim: downstream expects SimpleNamespace with
# codex-specific fields (.codex_reasoning_items, .reasoning_details,
# and .call_id/.response_item_id on tool calls).
_tc_list = None
if _cnr.tool_calls:
_tc_list = []
for tc in _cnr.tool_calls:
_tc_ns = SimpleNamespace(
id=tc.id, type="function",
function=SimpleNamespace(name=tc.name, arguments=tc.arguments),
)
if tc.provider_data:
if tc.provider_data.get("call_id"):
_tc_ns.call_id = tc.provider_data["call_id"]
if tc.provider_data.get("response_item_id"):
_tc_ns.response_item_id = tc.provider_data["response_item_id"]
_tc_list.append(_tc_ns)
assistant_message = SimpleNamespace(
content=_cnr.content,
tool_calls=_tc_list or None,
reasoning=_cnr.reasoning,
reasoning_content=None,
codex_reasoning_items=(
_cnr.provider_data.get("codex_reasoning_items")
if _cnr.provider_data else None
),
reasoning_details=(
_cnr.provider_data.get("reasoning_details")
if _cnr.provider_data else None
),
)
finish_reason = _cnr.finish_reason
elif self.api_mode == "anthropic_messages":
_transport = self._get_anthropic_transport()
_nr = _transport.normalize_response(

View file

@ -0,0 +1,220 @@
"""Tests for the ResponsesApiTransport (Codex)."""
import json
import pytest
from types import SimpleNamespace
from agent.transports import get_transport
from agent.transports.types import NormalizedResponse, ToolCall
@pytest.fixture
def transport():
import agent.transports.codex # noqa: F401
return get_transport("codex_responses")
class TestCodexTransportBasic:
def test_api_mode(self, transport):
assert transport.api_mode == "codex_responses"
def test_registered_on_import(self, transport):
assert transport is not None
def test_convert_tools(self, transport):
tools = [{
"type": "function",
"function": {
"name": "terminal",
"description": "Run a command",
"parameters": {"type": "object", "properties": {"command": {"type": "string"}}},
}
}]
result = transport.convert_tools(tools)
assert len(result) == 1
assert result[0]["type"] == "function"
assert result[0]["name"] == "terminal"
class TestCodexBuildKwargs:
def test_basic_kwargs(self, transport):
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
]
kw = transport.build_kwargs(
model="gpt-5.4",
messages=messages,
tools=[],
)
assert kw["model"] == "gpt-5.4"
assert kw["instructions"] == "You are helpful."
assert "input" in kw
assert kw["store"] is False
def test_system_extracted_from_messages(self, transport):
messages = [
{"role": "system", "content": "Custom system prompt"},
{"role": "user", "content": "Hi"},
]
kw = transport.build_kwargs(model="gpt-5.4", messages=messages, tools=[])
assert kw["instructions"] == "Custom system prompt"
def test_no_system_uses_default(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(model="gpt-5.4", messages=messages, tools=[])
assert kw["instructions"] # should be non-empty default
def test_reasoning_config(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=messages, tools=[],
reasoning_config={"effort": "high"},
)
assert kw.get("reasoning", {}).get("effort") == "high"
def test_reasoning_disabled(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=messages, tools=[],
reasoning_config={"enabled": False},
)
assert "reasoning" not in kw or kw.get("include") == []
def test_session_id_sets_cache_key(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=messages, tools=[],
session_id="test-session-123",
)
assert kw.get("prompt_cache_key") == "test-session-123"
def test_github_responses_no_cache_key(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=messages, tools=[],
session_id="test-session",
is_github_responses=True,
)
assert "prompt_cache_key" not in kw
def test_max_tokens(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=messages, tools=[],
max_tokens=4096,
)
assert kw.get("max_output_tokens") == 4096
def test_codex_backend_no_max_output_tokens(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=messages, tools=[],
max_tokens=4096,
is_codex_backend=True,
)
assert "max_output_tokens" not in kw
def test_xai_headers(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="grok-3", messages=messages, tools=[],
session_id="conv-123",
is_xai_responses=True,
)
assert kw.get("extra_headers", {}).get("x-grok-conv-id") == "conv-123"
def test_minimal_effort_clamped(self, transport):
messages = [{"role": "user", "content": "Hi"}]
kw = transport.build_kwargs(
model="gpt-5.4", messages=messages, tools=[],
reasoning_config={"effort": "minimal"},
)
# "minimal" should be clamped to "low"
assert kw.get("reasoning", {}).get("effort") == "low"
class TestCodexValidateResponse:
def test_none_response(self, transport):
assert transport.validate_response(None) is False
def test_empty_output(self, transport):
r = SimpleNamespace(output=[], output_text=None)
assert transport.validate_response(r) is False
def test_valid_output(self, transport):
r = SimpleNamespace(output=[{"type": "message", "content": []}])
assert transport.validate_response(r) is True
def test_output_text_fallback_not_valid(self, transport):
"""validate_response is strict — output_text doesn't make it valid.
The caller handles output_text fallback with diagnostic logging."""
r = SimpleNamespace(output=None, output_text="Some text")
assert transport.validate_response(r) is False
class TestCodexMapFinishReason:
def test_completed(self, transport):
assert transport.map_finish_reason("completed") == "stop"
def test_incomplete(self, transport):
assert transport.map_finish_reason("incomplete") == "length"
def test_failed(self, transport):
assert transport.map_finish_reason("failed") == "stop"
def test_unknown(self, transport):
assert transport.map_finish_reason("unknown_status") == "stop"
class TestCodexNormalizeResponse:
def test_text_response(self, transport):
"""Normalize a simple text Codex response."""
r = SimpleNamespace(
output=[
SimpleNamespace(
type="message",
role="assistant",
content=[SimpleNamespace(type="output_text", text="Hello world")],
status="completed",
),
],
status="completed",
incomplete_details=None,
usage=SimpleNamespace(input_tokens=10, output_tokens=5,
input_tokens_details=None, output_tokens_details=None),
)
nr = transport.normalize_response(r)
assert isinstance(nr, NormalizedResponse)
assert nr.content == "Hello world"
assert nr.finish_reason == "stop"
def test_tool_call_response(self, transport):
"""Normalize a Codex response with tool calls."""
r = SimpleNamespace(
output=[
SimpleNamespace(
type="function_call",
call_id="call_abc123",
name="terminal",
arguments=json.dumps({"command": "ls"}),
id="fc_abc123",
status="completed",
),
],
status="completed",
incomplete_details=None,
usage=SimpleNamespace(input_tokens=10, output_tokens=20,
input_tokens_details=None, output_tokens_details=None),
)
nr = transport.normalize_response(r)
assert nr.finish_reason == "tool_calls"
assert len(nr.tool_calls) == 1
tc = nr.tool_calls[0]
assert tc.name == "terminal"
assert '"command"' in tc.arguments

View file

@ -12,6 +12,7 @@ from types import SimpleNamespace
from unittest.mock import patch, MagicMock
import pytest
from agent.codex_responses_adapter import _chat_messages_to_responses_input, _normalize_codex_response, _preflight_codex_input_items
sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None))
sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object))
@ -446,7 +447,7 @@ class TestChatMessagesToResponsesInput:
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "user", "content": "hello"}]
items = agent._chat_messages_to_responses_input(messages)
items = _chat_messages_to_responses_input(messages)
assert items == [{"role": "user", "content": "hello"}]
def test_system_messages_filtered(self, monkeypatch):
@ -456,7 +457,7 @@ class TestChatMessagesToResponsesInput:
{"role": "system", "content": "be helpful"},
{"role": "user", "content": "hello"},
]
items = agent._chat_messages_to_responses_input(messages)
items = _chat_messages_to_responses_input(messages)
assert len(items) == 1
assert items[0]["role"] == "user"
@ -472,7 +473,7 @@ class TestChatMessagesToResponsesInput:
"function": {"name": "web_search", "arguments": '{"query": "test"}'},
}],
}]
items = agent._chat_messages_to_responses_input(messages)
items = _chat_messages_to_responses_input(messages)
fc_items = [i for i in items if i.get("type") == "function_call"]
assert len(fc_items) == 1
assert fc_items[0]["name"] == "web_search"
@ -482,7 +483,7 @@ class TestChatMessagesToResponsesInput:
agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses",
base_url="https://chatgpt.com/backend-api/codex")
messages = [{"role": "tool", "tool_call_id": "call_abc", "content": "result here"}]
items = agent._chat_messages_to_responses_input(messages)
items = _chat_messages_to_responses_input(messages)
assert items[0]["type"] == "function_call_output"
assert items[0]["call_id"] == "call_abc"
assert items[0]["output"] == "result here"
@ -502,7 +503,7 @@ class TestChatMessagesToResponsesInput:
},
{"role": "user", "content": "continue"},
]
items = agent._chat_messages_to_responses_input(messages)
items = _chat_messages_to_responses_input(messages)
reasoning_items = [i for i in items if i.get("type") == "reasoning"]
assert len(reasoning_items) == 1
assert reasoning_items[0]["encrypted_content"] == "gAAAA_test_blob"
@ -515,7 +516,7 @@ class TestChatMessagesToResponsesInput:
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "hello"},
]
items = agent._chat_messages_to_responses_input(messages)
items = _chat_messages_to_responses_input(messages)
reasoning_items = [i for i in items if i.get("type") == "reasoning"]
assert len(reasoning_items) == 0
@ -539,7 +540,7 @@ class TestNormalizeCodexResponse:
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
msg, reason = _normalize_codex_response(response)
assert msg.content == "Hello!"
assert reason == "stop"
@ -557,7 +558,7 @@ class TestNormalizeCodexResponse:
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
msg, reason = _normalize_codex_response(response)
assert msg.content == "42"
assert "math" in msg.reasoning
assert reason == "stop"
@ -576,7 +577,7 @@ class TestNormalizeCodexResponse:
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
msg, reason = _normalize_codex_response(response)
assert msg.codex_reasoning_items is not None
assert len(msg.codex_reasoning_items) == 1
assert msg.codex_reasoning_items[0]["encrypted_content"] == "gAAAA_secret_blob_123"
@ -592,7 +593,7 @@ class TestNormalizeCodexResponse:
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
msg, reason = _normalize_codex_response(response)
assert msg.codex_reasoning_items is None
def test_tool_calls_extracted(self, monkeypatch):
@ -605,7 +606,7 @@ class TestNormalizeCodexResponse:
],
status="completed",
)
msg, reason = agent._normalize_codex_response(response)
msg, reason = _normalize_codex_response(response)
assert reason == "tool_calls"
assert len(msg.tool_calls) == 1
assert msg.tool_calls[0].function.name == "web_search"
@ -821,7 +822,7 @@ class TestCodexReasoningPreflight:
"summary": [{"type": "summary_text", "text": "Thinking about it"}]},
{"role": "assistant", "content": "hi there"},
]
normalized = agent._preflight_codex_input_items(raw_input)
normalized = _preflight_codex_input_items(raw_input)
reasoning_items = [i for i in normalized if i.get("type") == "reasoning"]
assert len(reasoning_items) == 1
assert reasoning_items[0]["encrypted_content"] == "abc123encrypted"
@ -837,7 +838,7 @@ class TestCodexReasoningPreflight:
raw_input = [
{"type": "reasoning", "encrypted_content": "abc123"},
]
normalized = agent._preflight_codex_input_items(raw_input)
normalized = _preflight_codex_input_items(raw_input)
assert len(normalized) == 1
assert "id" not in normalized[0]
assert normalized[0]["summary"] == [] # default empty summary
@ -849,7 +850,7 @@ class TestCodexReasoningPreflight:
{"type": "reasoning", "encrypted_content": ""},
{"role": "user", "content": "hello"},
]
normalized = agent._preflight_codex_input_items(raw_input)
normalized = _preflight_codex_input_items(raw_input)
reasoning_items = [i for i in normalized if i.get("type") == "reasoning"]
assert len(reasoning_items) == 0
@ -868,7 +869,7 @@ class TestCodexReasoningPreflight:
},
{"role": "user", "content": "follow up"},
]
items = agent._chat_messages_to_responses_input(messages)
items = _chat_messages_to_responses_input(messages)
reasoning_items = [i for i in items if isinstance(i, dict) and i.get("type") == "reasoning"]
assert len(reasoning_items) == 1
assert reasoning_items[0]["encrypted_content"] == "enc123"

View file

@ -16,6 +16,7 @@ from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agent.codex_responses_adapter import _chat_messages_to_responses_input, _normalize_codex_response, _preflight_codex_input_items
import run_agent
from run_agent import AIAgent
@ -4248,7 +4249,7 @@ class TestNormalizeCodexDictArguments:
json.dumps, not str(), so downstream json.loads() succeeds."""
args_dict = {"query": "weather in NYC", "units": "celsius"}
response = self._make_codex_response("function_call", args_dict)
msg, _ = agent._normalize_codex_response(response)
msg, _ = _normalize_codex_response(response)
tc = msg.tool_calls[0]
parsed = json.loads(tc.function.arguments)
assert parsed == args_dict
@ -4257,7 +4258,7 @@ class TestNormalizeCodexDictArguments:
"""dict arguments from custom_tool_call must also use json.dumps."""
args_dict = {"path": "/tmp/test.txt", "content": "hello"}
response = self._make_codex_response("custom_tool_call", args_dict)
msg, _ = agent._normalize_codex_response(response)
msg, _ = _normalize_codex_response(response)
tc = msg.tool_calls[0]
parsed = json.loads(tc.function.arguments)
assert parsed == args_dict
@ -4266,7 +4267,7 @@ class TestNormalizeCodexDictArguments:
"""String arguments must pass through without modification."""
args_str = '{"query": "test"}'
response = self._make_codex_response("function_call", args_str)
msg, _ = agent._normalize_codex_response(response)
msg, _ = _normalize_codex_response(response)
tc = msg.tool_calls[0]
assert tc.function.arguments == args_str

View file

@ -640,7 +640,8 @@ def test_run_conversation_codex_tool_round_trip(monkeypatch):
def test_chat_messages_to_responses_input_uses_call_id_for_function_call(monkeypatch):
agent = _build_agent(monkeypatch)
items = agent._chat_messages_to_responses_input(
from agent.codex_responses_adapter import _chat_messages_to_responses_input
items = _chat_messages_to_responses_input(
[
{"role": "user", "content": "Run terminal"},
{
@ -668,7 +669,8 @@ def test_chat_messages_to_responses_input_uses_call_id_for_function_call(monkeyp
def test_chat_messages_to_responses_input_accepts_call_pipe_fc_ids(monkeypatch):
agent = _build_agent(monkeypatch)
items = agent._chat_messages_to_responses_input(
from agent.codex_responses_adapter import _chat_messages_to_responses_input
items = _chat_messages_to_responses_input(
[
{"role": "user", "content": "Run terminal"},
{
@ -696,7 +698,8 @@ def test_chat_messages_to_responses_input_accepts_call_pipe_fc_ids(monkeypatch):
def test_preflight_codex_api_kwargs_strips_optional_function_call_id(monkeypatch):
agent = _build_agent(monkeypatch)
preflight = agent._preflight_codex_api_kwargs(
from agent.codex_responses_adapter import _preflight_codex_api_kwargs
preflight = _preflight_codex_api_kwargs(
{
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
@ -724,7 +727,8 @@ def test_preflight_codex_api_kwargs_rejects_function_call_output_without_call_id
agent = _build_agent(monkeypatch)
with pytest.raises(ValueError, match="function_call_output is missing call_id"):
agent._preflight_codex_api_kwargs(
from agent.codex_responses_adapter import _preflight_codex_api_kwargs
_preflight_codex_api_kwargs(
{
"model": "gpt-5-codex",
"instructions": "You are Hermes.",
@ -741,7 +745,8 @@ def test_preflight_codex_api_kwargs_rejects_unsupported_request_fields(monkeypat
kwargs["some_unknown_field"] = "value"
with pytest.raises(ValueError, match="unsupported field"):
agent._preflight_codex_api_kwargs(kwargs)
from agent.codex_responses_adapter import _preflight_codex_api_kwargs
_preflight_codex_api_kwargs(kwargs)
def test_preflight_codex_api_kwargs_allows_reasoning_and_temperature(monkeypatch):
@ -752,7 +757,8 @@ def test_preflight_codex_api_kwargs_allows_reasoning_and_temperature(monkeypatch
kwargs["temperature"] = 0.7
kwargs["max_output_tokens"] = 4096
result = agent._preflight_codex_api_kwargs(kwargs)
from agent.codex_responses_adapter import _preflight_codex_api_kwargs
result = _preflight_codex_api_kwargs(kwargs)
assert result["reasoning"] == {"effort": "high", "summary": "auto"}
assert result["include"] == ["reasoning.encrypted_content"]
assert result["temperature"] == 0.7
@ -764,7 +770,8 @@ def test_preflight_codex_api_kwargs_allows_service_tier(monkeypatch):
kwargs = _codex_request_kwargs()
kwargs["service_tier"] = "priority"
result = agent._preflight_codex_api_kwargs(kwargs)
from agent.codex_responses_adapter import _preflight_codex_api_kwargs
result = _preflight_codex_api_kwargs(kwargs)
assert result["service_tier"] == "priority"
@ -841,7 +848,8 @@ def test_run_conversation_codex_continues_after_incomplete_interim_message(monke
def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(monkeypatch):
agent = _build_agent(monkeypatch)
assistant_message, finish_reason = agent._normalize_codex_response(
from agent.codex_responses_adapter import _normalize_codex_response
assistant_message, finish_reason = _normalize_codex_response(
_codex_commentary_message_response("I'll inspect the repository first.")
)
@ -1068,7 +1076,8 @@ def test_normalize_codex_response_marks_reasoning_only_as_incomplete(monkeypatch
sends them into the empty-content retry loop (3 retries then failure).
"""
agent = _build_agent(monkeypatch)
assistant_message, finish_reason = agent._normalize_codex_response(
from agent.codex_responses_adapter import _normalize_codex_response
assistant_message, finish_reason = _normalize_codex_response(
_codex_reasoning_only_response()
)
@ -1101,7 +1110,8 @@ def test_normalize_codex_response_reasoning_with_content_is_stop(monkeypatch):
status="completed",
model="gpt-5-codex",
)
assistant_message, finish_reason = agent._normalize_codex_response(response)
from agent.codex_responses_adapter import _normalize_codex_response
assistant_message, finish_reason = _normalize_codex_response(response)
assert finish_reason == "stop"
assert "Here is the answer" in assistant_message.content
@ -1186,7 +1196,8 @@ def test_chat_messages_to_responses_input_reasoning_only_has_following_item(monk
],
},
]
items = agent._chat_messages_to_responses_input(messages)
from agent.codex_responses_adapter import _chat_messages_to_responses_input
items = _chat_messages_to_responses_input(messages)
# Find the reasoning item
reasoning_indices = [i for i, it in enumerate(items) if it.get("type") == "reasoning"]
@ -1273,7 +1284,8 @@ def test_chat_messages_to_responses_input_deduplicates_reasoning_ids(monkeypatch
],
},
]
items = agent._chat_messages_to_responses_input(messages)
from agent.codex_responses_adapter import _chat_messages_to_responses_input
items = _chat_messages_to_responses_input(messages)
reasoning_items = [it for it in items if it.get("type") == "reasoning"]
# Dedup: rs_aaa appears in both turns but should only be emitted once.
@ -1299,7 +1311,8 @@ def test_preflight_codex_input_deduplicates_reasoning_ids(monkeypatch):
{"type": "reasoning", "id": "rs_zzz", "encrypted_content": "enc_b"},
{"role": "assistant", "content": "done"},
]
normalized = agent._preflight_codex_input_items(raw_input)
from agent.codex_responses_adapter import _preflight_codex_input_items
normalized = _preflight_codex_input_items(raw_input)
reasoning_items = [it for it in normalized if it.get("type") == "reasoning"]
# rs_xyz duplicate should be collapsed to one item; rs_zzz kept.