diff --git a/agent/codex_responses_adapter.py b/agent/codex_responses_adapter.py index 13a81ddafdc..e539656b7d1 100644 --- a/agent/codex_responses_adapter.py +++ b/agent/codex_responses_adapter.py @@ -23,6 +23,38 @@ from agent.prompt_builder import DEFAULT_AGENT_IDENTITY logger = logging.getLogger(__name__) +def _classify_responses_issuer( + *, + is_xai_responses: bool = False, + is_github_responses: bool = False, + is_codex_backend: bool = False, + base_url: Optional[str] = None, +) -> str: + """Stable identifier for the Responses endpoint that mints encrypted_content. + + ``reasoning.encrypted_content`` is sealed to the endpoint that issued it: + replaying a Codex-minted blob against xAI (or vice versa) deterministically + returns HTTP 400 ``invalid_encrypted_content``. Stamping the issuer on + persisted reasoning items and filtering at replay time lets a single + conversation switch models without poisoning history with un-decryptable + reasoning blocks. + """ + if is_xai_responses: + return "xai_responses" + if is_github_responses: + return "github_responses" + if is_codex_backend: + return "codex_backend" + if base_url: + return f"other:{base_url}" + return "other" + + +# Throttle the per-process cross-issuer skip warning so we don't flood logs +# when a long history contains many stale-issuer reasoning blocks. +_CROSS_ISSUER_WARN_EMITTED = False + + # Matches Codex/Harmony tool-call serialization that occasionally leaks into # assistant-message content when the model fails to emit a structured # ``function_call`` item. Accepts the common forms: @@ -249,6 +281,7 @@ def _chat_messages_to_responses_input( *, is_xai_responses: bool = False, replay_encrypted_reasoning: bool = True, + current_issuer_kind: Optional[str] = None, ) -> List[Dict[str, Any]]: """Convert internal chat-style messages to Responses input items. @@ -270,6 +303,19 @@ def _chat_messages_to_responses_input( ``AIAgent._disable_codex_reasoning_replay`` which both strips cached items from the conversation history and threads ``replay_enabled=False`` through this converter so subsequent turns send no reasoning items. + + ``current_issuer_kind`` enables a per-item cross-issuer guard. The + Responses API's ``encrypted_content`` blob is decryptable only by the + endpoint that minted it — replaying a Codex-issued blob against xAI + (or vice versa) always yields HTTP 400 ``invalid_encrypted_content`` + and breaks every subsequent turn in the same session. When this + argument is provided and a reasoning item carries an ``_issuer_kind`` + stamp from a different endpoint, the item is dropped from the replayed + input. Legacy items without a stamp are still replayed + (backwards-compatible). The two guards compose: + ``replay_encrypted_reasoning=False`` is the session-wide kill switch + (drops ALL replay); ``current_issuer_kind`` is the per-item filter + that runs only when replay is still enabled. """ items: List[Dict[str, Any]] = [] seen_item_ids: set = set() @@ -311,11 +357,40 @@ def _chat_messages_to_responses_input( item_id = ri.get("id") if item_id and item_id in seen_item_ids: continue + # Cross-issuer guard: drop reasoning blocks that + # were minted by a different Responses endpoint. + # The current endpoint cannot decrypt foreign + # encrypted_content and would reject the whole + # request with HTTP 400 invalid_encrypted_content. + # Unstamped (legacy) items pass through. + item_issuer = ri.get("_issuer_kind") + if ( + current_issuer_kind is not None + and item_issuer is not None + and item_issuer != current_issuer_kind + ): + global _CROSS_ISSUER_WARN_EMITTED + if not _CROSS_ISSUER_WARN_EMITTED: + logger.warning( + "Dropping reasoning item minted by %s while " + "calling %s — encrypted_content is sealed to " + "its issuer. This happens when a session " + "switches model providers mid-conversation.", + item_issuer, current_issuer_kind, + ) + _CROSS_ISSUER_WARN_EMITTED = True + continue # Strip the "id" field — with store=False the # Responses API cannot look up items by ID and # returns 404. The encrypted_content blob is # self-contained for reasoning chain continuity. - replay_item = {k: v for k, v in ri.items() if k != "id"} + # Also strip the internal "_issuer_kind" stamp; + # it is a Hermes-side metadata key and not part + # of the Responses API schema. + replay_item = { + k: v for k, v in ri.items() + if k not in ("id", "_issuer_kind") + } items.append(replay_item) if item_id: seen_item_ids.add(item_id) @@ -889,8 +964,18 @@ def _extract_responses_reasoning_text(item: Any) -> str: # Full response normalization # --------------------------------------------------------------------------- -def _normalize_codex_response(response: Any) -> tuple[Any, str]: - """Normalize a Responses API object to an assistant_message-like object.""" +def _normalize_codex_response( + response: Any, + *, + issuer_kind: Optional[str] = None, +) -> tuple[Any, str]: + """Normalize a Responses API object to an assistant_message-like object. + + ``issuer_kind`` (when provided) is stamped onto each reasoning item the + response yields, so future replays can detect when the active endpoint + differs from the one that minted the encrypted_content blob and drop + the item instead of triggering HTTP 400 invalid_encrypted_content. + """ output = getattr(response, "output", None) if not isinstance(output, list) or not output: # The Codex backend can return empty output when the answer was @@ -980,6 +1065,12 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]: encrypted = getattr(item, "encrypted_content", None) if isinstance(encrypted, str) and encrypted: raw_item = {"type": "reasoning", "encrypted_content": encrypted} + # Stamp the issuer so future turns can detect when a + # model swap moved the conversation to an endpoint that + # cannot decrypt this blob — see _chat_messages_to_responses_input + # cross-issuer guard. + if issuer_kind: + raw_item["_issuer_kind"] = issuer_kind item_id = getattr(item, "id", None) if isinstance(item_id, str) and item_id.startswith("rs_tmp_"): logger.debug( diff --git a/agent/transports/codex.py b/agent/transports/codex.py index 664b2eb6a55..f24f2304899 100644 --- a/agent/transports/codex.py +++ b/agent/transports/codex.py @@ -17,19 +17,39 @@ class ResponsesApiTransport(ProviderTransport): Wraps the functions extracted into codex_responses_adapter.py (PR 1). """ + # Issuer kind of the most recent build_kwargs / convert_messages call. + # Used as a fallback when normalize_response is invoked without an + # explicit ``issuer_kind`` kwarg, so reasoning items captured from a + # response are stamped with the endpoint that minted them. Plain class + # attribute default; mutated on the instance, not the class. + _last_issuer_kind: Optional[str] = None + @property def api_mode(self) -> str: return "codex_responses" + def _resolve_issuer_kind(self, params: Dict[str, Any]) -> str: + """Classify the current Responses endpoint from transport params.""" + from agent.codex_responses_adapter import _classify_responses_issuer + return _classify_responses_issuer( + is_xai_responses=bool(params.get("is_xai_responses")), + is_github_responses=bool(params.get("is_github_responses")), + is_codex_backend=bool(params.get("is_codex_backend")), + base_url=params.get("base_url"), + ) + def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> Any: """Convert OpenAI chat messages to Responses API input items.""" from agent.codex_responses_adapter import _chat_messages_to_responses_input + issuer = self._resolve_issuer_kind(kwargs) + self._last_issuer_kind = issuer return _chat_messages_to_responses_input( messages, is_xai_responses=bool(kwargs.get("is_xai_responses")), replay_encrypted_reasoning=bool( kwargs.get("replay_encrypted_reasoning", True) ), + current_issuer_kind=issuer, ) def convert_tools(self, tools: List[Dict[str, Any]]) -> Any: @@ -86,6 +106,14 @@ class ResponsesApiTransport(ProviderTransport): params.get("replay_encrypted_reasoning", True) ) + # Resolve the issuing endpoint for this call. Stashed on the + # transport so normalize_response can stamp it onto reasoning + # items captured from the response, and passed to the input + # converter so foreign-issuer reasoning blocks in history are + # dropped before the API rejects them. + issuer_kind = self._resolve_issuer_kind(params) + self._last_issuer_kind = issuer_kind + # Resolve reasoning effort reasoning_effort = "medium" reasoning_enabled = True @@ -107,6 +135,7 @@ class ResponsesApiTransport(ProviderTransport): payload_messages, is_xai_responses=is_xai_responses, replay_encrypted_reasoning=replay_encrypted_reasoning, + current_issuer_kind=issuer_kind, ), "tools": response_tools, "store": False, @@ -224,8 +253,13 @@ class ResponsesApiTransport(ProviderTransport): _normalize_codex_response, ) + # Issuer for this response = explicit kwarg if the caller knows it, + # otherwise the stash from the matching build_kwargs/convert_messages + # call. Either way it gets stamped onto reasoning items so future + # turns can detect a model swap and drop foreign-issuer blobs. + issuer_kind = kwargs.get("issuer_kind") or self._last_issuer_kind # _normalize_codex_response returns (SimpleNamespace, finish_reason_str) - msg, finish_reason = _normalize_codex_response(response) + msg, finish_reason = _normalize_codex_response(response, issuer_kind=issuer_kind) tool_calls = None if msg and msg.tool_calls: diff --git a/tests/run_agent/test_codex_xai_oauth_recovery.py b/tests/run_agent/test_codex_xai_oauth_recovery.py index 2e0d0709521..170dabb3069 100644 --- a/tests/run_agent/test_codex_xai_oauth_recovery.py +++ b/tests/run_agent/test_codex_xai_oauth_recovery.py @@ -914,3 +914,171 @@ def test_grok_4_still_resolves_to_256k(): # must be "grok-4" (or a more specific variant family if one is # ever added). The 256k contract must hold. assert DEFAULT_CONTEXT_LENGTHS[matched_key] == 256_000 + + +# --------------------------------------------------------------------------- +# Cross-issuer reasoning replay guard +# +# When a session switches model providers mid-conversation (e.g. user runs +# /model gpt-5.5 after several turns on grok-4.3), the persisted reasoning +# items carry encrypted_content that only the issuing endpoint can decrypt. +# Replaying them against the new endpoint deterministically returns HTTP 400 +# invalid_encrypted_content and breaks every subsequent turn. The cross-issuer +# guard stamps each reasoning item with its issuer on normalize and drops +# foreign-issuer items on replay. +# --------------------------------------------------------------------------- + + +def _stamped_assistant_msg(issuer_kind, *, text="hi", encrypted="enc_blob", rs_id="rs_001"): + return { + "role": "assistant", + "content": text, + "codex_reasoning_items": [ + { + "type": "reasoning", + "id": rs_id, + "encrypted_content": encrypted, + "summary": [], + "_issuer_kind": issuer_kind, + } + ], + } + + +def test_cross_issuer_reasoning_is_dropped_on_replay(): + """Reasoning minted by one Responses endpoint must not be replayed to + another. This is the regression for the chatgpt-backend vs xAI-OAuth + swap that returned invalid_encrypted_content on every turn after the + user changed model mid-session. + """ + from agent.codex_responses_adapter import _chat_messages_to_responses_input + + msgs = [ + {"role": "user", "content": "hi"}, + _stamped_assistant_msg("xai_responses", encrypted="grok_blob"), + {"role": "user", "content": "next"}, + ] + + # Calling against codex_backend — the grok-issued blob must be dropped. + items = _chat_messages_to_responses_input( + msgs, current_issuer_kind="codex_backend" + ) + reasoning = [it for it in items if it.get("type") == "reasoning"] + assert reasoning == [], ( + "Reasoning items stamped with a foreign _issuer_kind must be dropped " + "before the API rejects the whole request with invalid_encrypted_content." + ) + + +def test_same_issuer_reasoning_is_still_replayed(): + """Same-endpoint reasoning replay is the documented happy path (May 2026 + reversal). The cross-issuer guard must not regress it. + """ + from agent.codex_responses_adapter import _chat_messages_to_responses_input + + msgs = [ + {"role": "user", "content": "hi"}, + _stamped_assistant_msg("xai_responses", encrypted="grok_blob"), + {"role": "user", "content": "next"}, + ] + + items = _chat_messages_to_responses_input( + msgs, current_issuer_kind="xai_responses" + ) + reasoning = [it for it in items if it.get("type") == "reasoning"] + assert len(reasoning) == 1 + assert reasoning[0]["encrypted_content"] == "grok_blob" + # The internal stamp must not leak to the API payload. + assert "_issuer_kind" not in reasoning[0] + + +def test_unstamped_reasoning_is_replayed_for_backwards_compat(): + """Reasoning items persisted before this patch don't carry _issuer_kind. + They must still be replayed (legacy-compatible behaviour). + """ + from agent.codex_responses_adapter import _chat_messages_to_responses_input + + msgs = [ + {"role": "user", "content": "hi"}, + { + "role": "assistant", + "content": "hello", + "codex_reasoning_items": [ + { + "type": "reasoning", + "id": "rs_legacy", + "encrypted_content": "legacy_blob", + "summary": [], + } + ], + }, + {"role": "user", "content": "next"}, + ] + + items = _chat_messages_to_responses_input( + msgs, current_issuer_kind="codex_backend" + ) + reasoning = [it for it in items if it.get("type") == "reasoning"] + assert len(reasoning) == 1 + assert reasoning[0]["encrypted_content"] == "legacy_blob" + + +def test_normalize_codex_response_stamps_issuer_on_reasoning(): + """Reasoning captured from a response must be stamped with the issuer so + a later replay against a different endpoint can drop it. + """ + from types import SimpleNamespace + + from agent.codex_responses_adapter import _normalize_codex_response + + reasoning_item = SimpleNamespace( + type="reasoning", + id="rs_new", + encrypted_content="fresh_blob", + summary=[], + ) + message_item = SimpleNamespace( + type="message", + role="assistant", + status="completed", + content=[SimpleNamespace(type="output_text", text="ok")], + id="msg_1", + ) + response = SimpleNamespace(output=[reasoning_item, message_item], status="completed") + + msg, _ = _normalize_codex_response(response, issuer_kind="xai_responses") + assert msg.codex_reasoning_items and len(msg.codex_reasoning_items) == 1 + assert msg.codex_reasoning_items[0]["_issuer_kind"] == "xai_responses" + assert msg.codex_reasoning_items[0]["encrypted_content"] == "fresh_blob" + + +def test_transport_round_trip_drops_foreign_reasoning(): + """Full transport flow: build_kwargs against codex_backend after grok turns + must produce an `input` array that contains zero foreign reasoning items. + """ + from agent.transports.codex import ResponsesApiTransport + + transport = ResponsesApiTransport() + messages = [ + {"role": "system", "content": "you are hermes"}, + {"role": "user", "content": "hi"}, + _stamped_assistant_msg("xai_responses", encrypted="grok_blob"), + {"role": "user", "content": "엑스다임 프로젝트 파악, 스킬로 정리."}, + ] + + kwargs = transport.build_kwargs( + model="gpt-5.5", + messages=messages, + tools=None, + is_codex_backend=True, + is_xai_responses=False, + is_github_responses=False, + base_url="https://chatgpt.com/backend-api/codex", + instructions="you are hermes", + ) + + reasoning = [it for it in kwargs["input"] if it.get("type") == "reasoning"] + assert reasoning == [], ( + "Cross-issuer reasoning leaked through build_kwargs — this is the " + "exact regression that broke session 40de1ae0 on 2026-05-25 01:09." + )