diff --git a/agent/codex_responses_adapter.py b/agent/codex_responses_adapter.py index 798ea0856a..c5d6dfcea4 100644 --- a/agent/codex_responses_adapter.py +++ b/agent/codex_responses_adapter.py @@ -227,6 +227,23 @@ def _responses_tools(tools: Optional[List[Dict[str, Any]]] = None) -> Optional[L # Message format conversion # --------------------------------------------------------------------------- +_RESPONSE_MESSAGE_STATUSES = {"completed", "incomplete", "in_progress"} + + +def _normalize_responses_message_status(value: Any, *, default: str = "completed") -> str: + """Normalize a Responses assistant message status for replay. + + The API accepts completed/incomplete/in_progress on replayed assistant + output messages. Preserve those exactly (modulo case/hyphen spelling) so + incomplete Codex continuation turns don't get falsely marked completed. + """ + if isinstance(value, str): + status = value.strip().lower().replace("-", "_").replace(" ", "_") + if status in _RESPONSE_MESSAGE_STATUSES: + return status + return default + + def _chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert internal chat-style messages to Responses input items.""" items: List[Dict[str, Any]] = [] @@ -272,7 +289,57 @@ def _chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Di seen_item_ids.add(item_id) has_codex_reasoning = True - if content_parts: + # Replay exact assistant message items (with id/phase) from + # previous turns so the API can maintain prefix-cache hits. + # OpenAI docs: "preserve and resend phase on all assistant + # messages — dropping it can degrade performance." + codex_message_items = msg.get("codex_message_items") + replayed_message_items = 0 + if isinstance(codex_message_items, list): + for raw_item in codex_message_items: + if not isinstance(raw_item, dict): + continue + if raw_item.get("type") != "message" or raw_item.get("role") != "assistant": + continue + raw_content_parts = raw_item.get("content") + if not isinstance(raw_content_parts, list): + continue + + normalized_content_parts = [] + for part in raw_content_parts: + if not isinstance(part, dict): + continue + part_type = str(part.get("type") or "").strip() + if part_type not in {"output_text", "text"}: + continue + text = part.get("text", "") + if text is None: + text = "" + if not isinstance(text, str): + text = str(text) + normalized_content_parts.append({"type": "output_text", "text": text}) + + if not normalized_content_parts: + continue + + replay_item = { + "type": "message", + "role": "assistant", + "status": _normalize_responses_message_status(raw_item.get("status")), + "content": normalized_content_parts, + } + item_id = raw_item.get("id") + if isinstance(item_id, str) and item_id.strip(): + replay_item["id"] = item_id.strip() + phase = raw_item.get("phase") + if isinstance(phase, str) and phase.strip(): + replay_item["phase"] = phase.strip() + items.append(replay_item) + replayed_message_items += 1 + + if replayed_message_items > 0: + pass + elif content_parts: items.append({"role": "assistant", "content": content_parts}) elif content_text.strip(): items.append({"role": "assistant", "content": content_text}) @@ -432,6 +499,47 @@ def _preflight_codex_input_items(raw_items: Any) -> List[Dict[str, Any]]: normalized.append(reasoning_item) continue + if item_type == "message": + role = item.get("role") + if role != "assistant": + raise ValueError(f"Codex Responses input[{idx}] message items must have role='assistant'.") + content = item.get("content") + if not isinstance(content, list): + raise ValueError(f"Codex Responses input[{idx}] message item must have content list.") + normalized_content = [] + for part_idx, part in enumerate(content): + if not isinstance(part, dict): + raise ValueError( + f"Codex Responses input[{idx}] message content[{part_idx}] must be an object." + ) + part_type = part.get("type") + if part_type not in {"output_text", "text"}: + raise ValueError( + f"Codex Responses input[{idx}] message content[{part_idx}] has unsupported type {part_type!r}." + ) + text = part.get("text", "") + if text is None: + text = "" + if not isinstance(text, str): + text = str(text) + normalized_content.append({"type": "output_text", "text": text}) + if not normalized_content: + raise ValueError(f"Codex Responses input[{idx}] message item must contain at least one text part.") + normalized_item: Dict[str, Any] = { + "type": "message", + "role": "assistant", + "status": _normalize_responses_message_status(item.get("status")), + "content": normalized_content, + } + item_id = item.get("id") + if isinstance(item_id, str) and item_id.strip(): + normalized_item["id"] = item_id.strip() + phase = item.get("phase") + if isinstance(phase, str) and phase.strip(): + normalized_item["phase"] = phase.strip() + normalized.append(normalized_item) + continue + role = item.get("role") if role in {"user", "assistant"}: content = item.get("content", "") @@ -716,6 +824,7 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]: content_parts: List[str] = [] reasoning_parts: List[str] = [] reasoning_items_raw: List[Dict[str, Any]] = [] + message_items_raw: List[Dict[str, Any]] = [] tool_calls: List[Any] = [] has_incomplete_items = response_status in {"queued", "in_progress", "incomplete"} saw_commentary_phase = False @@ -734,6 +843,7 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]: if item_type == "message": item_phase = getattr(item, "phase", None) + normalized_phase = None if isinstance(item_phase, str): normalized_phase = item_phase.strip().lower() if normalized_phase in {"commentary", "analysis"}: @@ -743,6 +853,18 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]: message_text = _extract_responses_message_text(item) if message_text: content_parts.append(message_text) + raw_message_item: Dict[str, Any] = { + "type": "message", + "role": "assistant", + "status": _normalize_responses_message_status(item_status), + "content": [{"type": "output_text", "text": message_text}], + } + item_id = getattr(item, "id", None) + if isinstance(item_id, str) and item_id: + raw_message_item["id"] = item_id + if normalized_phase: + raw_message_item["phase"] = normalized_phase + message_items_raw.append(raw_message_item) elif item_type == "reasoning": reasoning_text = _extract_responses_reasoning_text(item) if reasoning_text: @@ -855,6 +977,7 @@ def _normalize_codex_response(response: Any) -> tuple[Any, str]: reasoning_content=None, reasoning_details=None, codex_reasoning_items=reasoning_items_raw or None, + codex_message_items=message_items_raw or None, ) if tool_calls: diff --git a/agent/transports/__init__.py b/agent/transports/__init__.py index 5752113325..d1c8251ed2 100644 --- a/agent/transports/__init__.py +++ b/agent/transports/__init__.py @@ -23,9 +23,14 @@ def get_transport(api_mode: str): This allows gradual migration — call sites can check for None and fall back to the legacy code path. """ - if not _REGISTRY: - _discover_transports() cls = _REGISTRY.get(api_mode) + if cls is None: + # The registry can be partially populated when a specific transport + # module was imported directly (for example chat_completions before + # codex). Discover on misses, not only when the registry is empty, so + # test/order-dependent imports do not make valid api_modes unavailable. + _discover_transports() + cls = _REGISTRY.get(api_mode) if cls is None: return None return cls() diff --git a/agent/transports/chat_completions.py b/agent/transports/chat_completions.py index 1cccf7e928..34d5caa88a 100644 --- a/agent/transports/chat_completions.py +++ b/agent/transports/chat_completions.py @@ -31,15 +31,15 @@ class ChatCompletionsTransport(ProviderTransport): def convert_messages(self, messages: List[Dict[str, Any]], **kwargs) -> List[Dict[str, Any]]: """Messages are already in OpenAI format — sanitize Codex leaks only. - Strips Codex Responses API fields (``codex_reasoning_items`` on the - message, ``call_id``/``response_item_id`` on tool_calls) that strict - chat-completions providers reject with 400/422. + Strips Codex Responses API fields (``codex_reasoning_items`` / + ``codex_message_items`` on the message, ``call_id``/``response_item_id`` + on tool_calls) that strict chat-completions providers reject with 400/422. """ needs_sanitize = False for msg in messages: if not isinstance(msg, dict): continue - if "codex_reasoning_items" in msg: + if "codex_reasoning_items" in msg or "codex_message_items" in msg: needs_sanitize = True break tool_calls = msg.get("tool_calls") @@ -59,6 +59,7 @@ class ChatCompletionsTransport(ProviderTransport): if not isinstance(msg, dict): continue msg.pop("codex_reasoning_items", None) + msg.pop("codex_message_items", None) tool_calls = msg.get("tool_calls") if isinstance(tool_calls, list): for tc in tool_calls: diff --git a/agent/transports/codex.py b/agent/transports/codex.py index ec48352193..783582d57b 100644 --- a/agent/transports/codex.py +++ b/agent/transports/codex.py @@ -120,6 +120,24 @@ class ResponsesApiTransport(ProviderTransport): if request_overrides: kwargs.update(request_overrides) + if is_codex_backend: + prompt_cache_key = kwargs.get("prompt_cache_key") + cache_scope_id = str(prompt_cache_key or session_id or "").strip() + if cache_scope_id: + existing_extra_headers = kwargs.get("extra_headers") + merged_extra_headers: Dict[str, str] = {} + if isinstance(existing_extra_headers, dict): + merged_extra_headers.update( + { + str(key): str(value) + for key, value in existing_extra_headers.items() + if key and value is not None + } + ) + merged_extra_headers["session_id"] = cache_scope_id + merged_extra_headers["x-client-request-id"] = cache_scope_id + kwargs["extra_headers"] = merged_extra_headers + max_tokens = params.get("max_tokens") if max_tokens is not None and not is_codex_backend: kwargs["max_output_tokens"] = max_tokens @@ -160,6 +178,8 @@ class ResponsesApiTransport(ProviderTransport): provider_data = {} if msg and hasattr(msg, "codex_reasoning_items") and msg.codex_reasoning_items: provider_data["codex_reasoning_items"] = msg.codex_reasoning_items + if msg and hasattr(msg, "codex_message_items") and msg.codex_message_items: + provider_data["codex_message_items"] = msg.codex_message_items if msg and hasattr(msg, "reasoning_details") and msg.reasoning_details: provider_data["reasoning_details"] = msg.reasoning_details diff --git a/agent/transports/types.py b/agent/transports/types.py index 74481f85cd..68a807b47c 100644 --- a/agent/transports/types.py +++ b/agent/transports/types.py @@ -97,7 +97,7 @@ class NormalizedResponse: Response-level ``provider_data`` examples: * Anthropic: ``{"reasoning_details": [...]}`` - * Codex: ``{"codex_reasoning_items": [...]}`` + * Codex: ``{"codex_reasoning_items": [...], "codex_message_items": [...]}`` * Others: ``None`` """ @@ -126,6 +126,11 @@ class NormalizedResponse: pd = self.provider_data or {} return pd.get("codex_reasoning_items") + @property + def codex_message_items(self): + pd = self.provider_data or {} + return pd.get("codex_message_items") + # --------------------------------------------------------------------------- # Factory helpers diff --git a/gateway/session.py b/gateway/session.py index f677432baf..7e4604c0d2 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -1232,6 +1232,7 @@ class SessionStore: reasoning_content=message.get("reasoning_content") if message.get("role") == "assistant" else None, reasoning_details=message.get("reasoning_details") if message.get("role") == "assistant" else None, codex_reasoning_items=message.get("codex_reasoning_items") if message.get("role") == "assistant" else None, + codex_message_items=message.get("codex_message_items") if message.get("role") == "assistant" else None, ) except Exception as e: logger.debug("Session DB operation failed: %s", e) @@ -1264,6 +1265,7 @@ class SessionStore: reasoning_content=msg.get("reasoning_content") if role == "assistant" else None, reasoning_details=msg.get("reasoning_details") if role == "assistant" else None, codex_reasoning_items=msg.get("codex_reasoning_items") if role == "assistant" else None, + codex_message_items=msg.get("codex_message_items") if role == "assistant" else None, ) except Exception as e: logger.debug("Failed to rewrite transcript in DB: %s", e) diff --git a/hermes_state.py b/hermes_state.py index ed95d25f45..8ae8ae6e61 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -31,7 +31,7 @@ T = TypeVar("T") DEFAULT_DB_PATH = get_hermes_home() / "state.db" -SCHEMA_VERSION = 8 +SCHEMA_VERSION = 9 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -83,7 +83,8 @@ CREATE TABLE IF NOT EXISTS messages ( reasoning TEXT, reasoning_content TEXT, reasoning_details TEXT, - codex_reasoning_items TEXT + codex_reasoning_items TEXT, + codex_message_items TEXT ); CREATE TABLE IF NOT EXISTS state_meta ( @@ -356,6 +357,15 @@ class SessionDB: except sqlite3.OperationalError: pass # Column already exists cursor.execute("UPDATE schema_version SET version = 8") + if current_version < 9: + # v9: preserve replayable Codex assistant message ids/phases so + # follow-up turns can rebuild Responses API message items instead + # of flattening everything to plain assistant text. + try: + cursor.execute('ALTER TABLE messages ADD COLUMN "codex_message_items" TEXT') + except sqlite3.OperationalError: + pass # Column already exists + cursor.execute("UPDATE schema_version SET version = 9") # Unique title index — always ensure it exists (safe to run after migrations # since the title column is guaranteed to exist at this point) @@ -956,6 +966,7 @@ class SessionDB: reasoning_content: str = None, reasoning_details: Any = None, codex_reasoning_items: Any = None, + codex_message_items: Any = None, ) -> int: """ Append a message to a session. Returns the message row ID. @@ -972,6 +983,10 @@ class SessionDB: json.dumps(codex_reasoning_items) if codex_reasoning_items else None ) + codex_message_items_json = ( + json.dumps(codex_message_items) + if codex_message_items else None + ) tool_calls_json = json.dumps(tool_calls) if tool_calls else None # Pre-compute tool call count @@ -983,8 +998,9 @@ class SessionDB: cursor = conn.execute( """INSERT INTO messages (session_id, role, content, tool_call_id, tool_calls, tool_name, timestamp, token_count, finish_reason, - reasoning, reasoning_content, reasoning_details, codex_reasoning_items) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + reasoning, reasoning_content, reasoning_details, codex_reasoning_items, + codex_message_items) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( session_id, role, @@ -999,6 +1015,7 @@ class SessionDB: reasoning_content, reasoning_details_json, codex_items_json, + codex_message_items_json, ), ) msg_id = cursor.lastrowid @@ -1112,7 +1129,8 @@ class SessionDB: with self._lock: cursor = self._conn.execute( "SELECT role, content, tool_call_id, tool_calls, tool_name, " - "reasoning, reasoning_content, reasoning_details, codex_reasoning_items " + "reasoning, reasoning_content, reasoning_details, codex_reasoning_items, " + "codex_message_items " "FROM messages WHERE session_id = ? ORDER BY timestamp, id", (session_id,), ) @@ -1150,6 +1168,12 @@ class SessionDB: except (json.JSONDecodeError, TypeError): logger.warning("Failed to deserialize codex_reasoning_items, falling back to None") msg["codex_reasoning_items"] = None + if row["codex_message_items"]: + try: + msg["codex_message_items"] = json.loads(row["codex_message_items"]) + except (json.JSONDecodeError, TypeError): + logger.warning("Failed to deserialize codex_message_items, falling back to None") + msg["codex_message_items"] = None messages.append(msg) return messages diff --git a/run_agent.py b/run_agent.py index aae61643f4..370ad97822 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3313,6 +3313,7 @@ class AIAgent: reasoning_content=msg.get("reasoning_content") if role == "assistant" else None, reasoning_details=msg.get("reasoning_details") if role == "assistant" else None, codex_reasoning_items=msg.get("codex_reasoning_items") if role == "assistant" else None, + codex_message_items=msg.get("codex_message_items") if role == "assistant" else None, ) self._last_flushed_db_idx = len(messages) except Exception as e: @@ -7669,6 +7670,13 @@ class AIAgent: if codex_items: msg["codex_reasoning_items"] = codex_items + # Codex Responses API: preserve exact assistant message items (with + # id/phase) so follow-up turns can replay structured items instead of + # flattening to plain text. This is required for prefix cache hits. + codex_message_items = getattr(assistant_message, "codex_message_items", None) + if codex_message_items: + msg["codex_message_items"] = codex_message_items + if assistant_message.tool_calls: tool_calls = [] for tool_call in assistant_message.tool_calls: @@ -11549,16 +11557,26 @@ class AIAgent: interim_has_content = bool((interim_msg.get("content") or "").strip()) interim_has_reasoning = bool(interim_msg.get("reasoning", "").strip()) if isinstance(interim_msg.get("reasoning"), str) else False interim_has_codex_reasoning = bool(interim_msg.get("codex_reasoning_items")) + interim_has_codex_message_items = bool(interim_msg.get("codex_message_items")) - if interim_has_content or interim_has_reasoning or interim_has_codex_reasoning: + if ( + interim_has_content + or interim_has_reasoning + or interim_has_codex_reasoning + or interim_has_codex_message_items + ): last_msg = messages[-1] if messages else None # Duplicate detection: two consecutive incomplete assistant # messages with identical content AND reasoning are collapsed. - # For reasoning-only messages (codex_reasoning_items differ but - # visible content/reasoning are both empty), we also compare - # the encrypted items to avoid silently dropping new state. + # For provider-state-only changes (encrypted reasoning + # items or replayable message ids/phases/statuses differ + # while visible content/reasoning are unchanged), compare + # those opaque payloads too so we don't silently drop the + # newer continuation state. last_codex_items = last_msg.get("codex_reasoning_items") if isinstance(last_msg, dict) else None interim_codex_items = interim_msg.get("codex_reasoning_items") + last_codex_message_items = last_msg.get("codex_message_items") if isinstance(last_msg, dict) else None + interim_codex_message_items = interim_msg.get("codex_message_items") duplicate_interim = ( isinstance(last_msg, dict) and last_msg.get("role") == "assistant" @@ -11566,6 +11584,7 @@ class AIAgent: and (last_msg.get("content") or "") == (interim_msg.get("content") or "") and (last_msg.get("reasoning") or "") == (interim_msg.get("reasoning") or "") and last_codex_items == interim_codex_items + and last_codex_message_items == interim_codex_message_items ) if not duplicate_interim: messages.append(interim_msg) diff --git a/tests/agent/transports/test_chat_completions.py b/tests/agent/transports/test_chat_completions.py index cb8e17c6af..4adf9f72e5 100644 --- a/tests/agent/transports/test_chat_completions.py +++ b/tests/agent/transports/test_chat_completions.py @@ -33,15 +33,18 @@ class TestChatCompletionsBasic: def test_convert_messages_strips_codex_fields(self, transport): msgs = [ {"role": "assistant", "content": "ok", "codex_reasoning_items": [{"id": "rs_1"}], + "codex_message_items": [{"id": "msg_1", "type": "message"}], "tool_calls": [{"id": "call_1", "call_id": "call_1", "response_item_id": "fc_1", "type": "function", "function": {"name": "t", "arguments": "{}"}}]}, ] result = transport.convert_messages(msgs) assert "codex_reasoning_items" not in result[0] + assert "codex_message_items" not in result[0] assert "call_id" not in result[0]["tool_calls"][0] assert "response_item_id" not in result[0]["tool_calls"][0] # Original list untouched (deepcopy-on-demand) assert "codex_reasoning_items" in msgs[0] + assert "codex_message_items" in msgs[0] class TestChatCompletionsBuildKwargs: diff --git a/tests/agent/transports/test_codex_transport.py b/tests/agent/transports/test_codex_transport.py index f97c913af2..d9db3be7c3 100644 --- a/tests/agent/transports/test_codex_transport.py +++ b/tests/agent/transports/test_codex_transport.py @@ -194,6 +194,36 @@ class TestCodexNormalizeResponse: assert nr.content == "Hello world" assert nr.finish_reason == "stop" + def test_message_items_preserved_in_provider_data(self, transport): + """Codex assistant message item ids/phases must survive transport normalization.""" + r = SimpleNamespace( + output=[ + SimpleNamespace( + type="message", + role="assistant", + id="msg_abc", + phase="final_answer", + content=[SimpleNamespace(type="output_text", text="Hello world")], + status="completed", + ), + ], + status="completed", + incomplete_details=None, + usage=SimpleNamespace(input_tokens=10, output_tokens=5, + input_tokens_details=None, output_tokens_details=None), + ) + nr = transport.normalize_response(r) + assert nr.codex_message_items == [ + { + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": "Hello world"}], + "id": "msg_abc", + "phase": "final_answer", + } + ] + def test_tool_call_response(self, transport): """Normalize a Codex response with tool calls.""" r = SimpleNamespace( diff --git a/tests/agent/transports/test_transport.py b/tests/agent/transports/test_transport.py index 75b3a2c702..67fb486fc9 100644 --- a/tests/agent/transports/test_transport.py +++ b/tests/agent/transports/test_transport.py @@ -60,6 +60,13 @@ class TestTransportRegistry: assert t is not None assert t.api_mode == "anthropic_messages" + def test_discovers_missing_transport_when_registry_partially_populated(self): + """Importing one transport directly must not hide other valid api_modes.""" + import agent.transports.chat_completions # noqa: F401 + t = get_transport("codex_responses") + assert t is not None + assert t.api_mode == "codex_responses" + def test_register_and_get(self): class DummyTransport(ProviderTransport): @property diff --git a/tests/agent/transports/test_types.py b/tests/agent/transports/test_types.py index dd3aadf1e1..2d576a8f83 100644 --- a/tests/agent/transports/test_types.py +++ b/tests/agent/transports/test_types.py @@ -270,3 +270,15 @@ class TestNormalizedResponseBackwardCompat: def test_codex_reasoning_items_none_when_absent(self): nr = NormalizedResponse(content="hi", tool_calls=None, finish_reason="stop") assert nr.codex_reasoning_items is None + + def test_codex_message_items_from_provider_data(self): + items = [{"id": "msg_1", "type": "message"}] + nr = NormalizedResponse( + content="hi", tool_calls=None, finish_reason="stop", + provider_data={"codex_message_items": items}, + ) + assert nr.codex_message_items == items + + def test_codex_message_items_none_when_absent(self): + nr = NormalizedResponse(content="hi", tool_calls=None, finish_reason="stop") + assert nr.codex_message_items is None diff --git a/tests/run_agent/test_provider_parity.py b/tests/run_agent/test_provider_parity.py index 3b7993c3bc..3b4c69a47b 100644 --- a/tests/run_agent/test_provider_parity.py +++ b/tests/run_agent/test_provider_parity.py @@ -716,6 +716,103 @@ class TestNormalizeCodexResponse: assert len(msg.tool_calls) == 1 assert msg.tool_calls[0].function.name == "web_search" + def test_message_items_captured_with_id_and_phase(self, monkeypatch): + """Exact message items (with id/phase) must be captured for cache replay.""" + agent = self._make_codex_agent(monkeypatch) + response = SimpleNamespace( + output=[ + SimpleNamespace( + type="message", status="completed", id="msg_abc", + phase="commentary", + content=[SimpleNamespace(type="output_text", text="Thinking...")], + ), + SimpleNamespace( + type="message", status="completed", id="msg_def", + phase="final_answer", + content=[SimpleNamespace(type="output_text", text="Done!")], + ), + ], + status="completed", + ) + msg, reason = _normalize_codex_response(response) + assert msg.codex_message_items is not None + assert len(msg.codex_message_items) == 2 + assert msg.codex_message_items[0]["id"] == "msg_abc" + assert msg.codex_message_items[0]["phase"] == "commentary" + assert msg.codex_message_items[0]["content"][0]["text"] == "Thinking..." + assert msg.codex_message_items[1]["id"] == "msg_def" + assert msg.codex_message_items[1]["phase"] == "final_answer" + assert msg.codex_message_items[1]["content"][0]["text"] == "Done!" + + def test_message_items_none_when_no_messages(self, monkeypatch): + """Only reasoning + tool calls should yield None codex_message_items.""" + agent = self._make_codex_agent(monkeypatch) + response = SimpleNamespace( + output=[ + SimpleNamespace(type="function_call", status="completed", + call_id="call_1", name="web_search", arguments='{}', id="fc_1"), + ], + status="completed", + ) + msg, reason = _normalize_codex_response(response) + assert msg.codex_message_items is None + + +class TestChatMessagesToResponsesInputMessageItems: + """Verify codex_message_items are replayed verbatim instead of reconstructed.""" + + def test_replays_exact_message_items(self, monkeypatch): + agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses", + base_url="https://chatgpt.com/backend-api/codex") + messages = [ + { + "role": "assistant", + "content": "Hello world", + "codex_message_items": [ + { + "type": "message", + "role": "assistant", + "status": "completed", + "id": "msg_123", + "phase": "final_answer", + "content": [{"type": "output_text", "text": "Hello world"}], + }, + ], + }, + {"role": "user", "content": "follow up"}, + ] + items = _chat_messages_to_responses_input(messages) + msg_items = [i for i in items if i.get("type") == "message"] + assert len(msg_items) == 1 + assert msg_items[0]["id"] == "msg_123" + assert msg_items[0]["phase"] == "final_answer" + assert msg_items[0]["content"][0]["text"] == "Hello world" + + def test_fallback_to_plain_when_no_message_items(self, monkeypatch): + agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses", + base_url="https://chatgpt.com/backend-api/codex") + messages = [{"role": "assistant", "content": "Hello world"}] + items = _chat_messages_to_responses_input(messages) + assert items == [{"role": "assistant", "content": "Hello world"}] + + def test_skips_invalid_message_items(self, monkeypatch): + agent = _make_agent(monkeypatch, "openai-codex", api_mode="codex_responses", + base_url="https://chatgpt.com/backend-api/codex") + messages = [ + { + "role": "assistant", + "content": "fallback text", + "codex_message_items": [ + {"type": "function_call", "role": "assistant"}, # wrong type + {"type": "message", "role": "user"}, # wrong role + {"type": "message", "role": "assistant", "content": "not a list"}, + ], + }, + ] + items = _chat_messages_to_responses_input(messages) + # All invalid — falls back to plain text reconstruction + assert items == [{"role": "assistant", "content": "fallback text"}] + # ── Chat completions response handling (OpenRouter/Nous) ───────────────────── diff --git a/tests/run_agent/test_run_agent_codex_responses.py b/tests/run_agent/test_run_agent_codex_responses.py index 913a041fbf..b906355900 100644 --- a/tests/run_agent/test_run_agent_codex_responses.py +++ b/tests/run_agent/test_run_agent_codex_responses.py @@ -943,6 +943,33 @@ def test_normalize_codex_response_marks_commentary_only_message_as_incomplete(mo assert "inspect the repository" in (assistant_message.content or "") +def test_normalize_codex_response_preserves_message_status_for_replay(monkeypatch): + """Incomplete Codex output messages must not be replayed as completed.""" + agent = _build_agent(monkeypatch) + from agent.codex_responses_adapter import _normalize_codex_response + + response = SimpleNamespace( + output=[ + SimpleNamespace( + type="message", + id="msg_partial", + phase="commentary", + status="in_progress", + content=[SimpleNamespace(type="output_text", text="Still working...")], + ) + ], + usage=SimpleNamespace(input_tokens=4, output_tokens=2, total_tokens=6), + status="in_progress", + model="gpt-5-codex", + ) + + assistant_message, finish_reason = _normalize_codex_response(response) + + assert finish_reason == "incomplete" + assert assistant_message.codex_message_items[0]["id"] == "msg_partial" + assert assistant_message.codex_message_items[0]["status"] == "in_progress" + + def test_normalize_codex_response_detects_leaked_tool_call_text(monkeypatch): """Harmony-style `to=functions.foo` leaked into assistant content with no structured function_call items must be treated as incomplete so the @@ -1403,6 +1430,44 @@ def test_chat_messages_to_responses_input_reasoning_only_has_following_item(monk assert following.get("role") == "assistant" +def test_codex_message_item_status_survives_conversion_and_preflight(monkeypatch): + """Stored Codex assistant message statuses must survive replay normalization.""" + agent = _build_agent(monkeypatch) + from agent.codex_responses_adapter import ( + _chat_messages_to_responses_input, + _preflight_codex_input_items, + ) + + items = _chat_messages_to_responses_input([ + { + "role": "assistant", + "content": "partial", + "codex_message_items": [ + { + "type": "message", + "role": "assistant", + "status": "incomplete", + "id": "msg_incomplete", + "phase": "commentary", + "content": [{"type": "output_text", "text": "partial"}], + } + ], + } + ]) + replay_item = next(item for item in items if item.get("type") == "message") + assert replay_item["status"] == "incomplete" + + normalized = _preflight_codex_input_items([ + { + "type": "message", + "role": "assistant", + "status": "in_progress", + "content": [{"type": "output_text", "text": "working"}], + } + ]) + assert normalized[0]["status"] == "in_progress" + + def test_duplicate_detection_distinguishes_different_codex_reasoning(monkeypatch): """Two consecutive reasoning-only responses with different encrypted content must NOT be treated as duplicates.""" @@ -1453,6 +1518,58 @@ def test_duplicate_detection_distinguishes_different_codex_reasoning(monkeypatch assert "enc_second" in encrypted_contents +def test_duplicate_detection_distinguishes_different_codex_message_items(monkeypatch): + """Incomplete turns with new message ids/phases/statuses must not be collapsed.""" + agent = _build_agent(monkeypatch) + responses = [ + SimpleNamespace( + output=[ + SimpleNamespace( + type="message", + id="msg_first", + phase="commentary", + status="in_progress", + content=[SimpleNamespace(type="output_text", text="Still working...")], + ) + ], + usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60), + status="in_progress", + model="gpt-5-codex", + ), + SimpleNamespace( + output=[ + SimpleNamespace( + type="message", + id="msg_second", + phase="commentary", + status="in_progress", + content=[SimpleNamespace(type="output_text", text="Still working...")], + ) + ], + usage=SimpleNamespace(input_tokens=50, output_tokens=10, total_tokens=60), + status="in_progress", + model="gpt-5-codex", + ), + _codex_message_response("Final answer after progress updates."), + ] + monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: responses.pop(0)) + + result = agent.run_conversation("keep going") + + assert result["completed"] is True + interim_msgs = [ + msg for msg in result["messages"] + if msg.get("role") == "assistant" + and msg.get("finish_reason") == "incomplete" + ] + assert len(interim_msgs) == 2 + assert [msg["codex_message_items"][0]["id"] for msg in interim_msgs] == [ + "msg_first", + "msg_second", + ] + assert all(msg["codex_message_items"][0]["status"] == "in_progress" for msg in interim_msgs) + + def test_chat_messages_to_responses_input_deduplicates_reasoning_ids(monkeypatch): """Duplicate reasoning item IDs across multi-turn incomplete responses must be deduplicated so the Responses API doesn't reject with HTTP 400.""" diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index f405cf8bd5..94cd498a66 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -308,6 +308,33 @@ class TestMessageStorage: assert "reasoning_content" in conv[0] assert conv[0]["reasoning_content"] == "" + def test_codex_message_items_persisted_and_restored(self, db): + """codex_message_items must round-trip through JSON serialization.""" + db.create_session(session_id="s1", source="cli") + items = [ + { + "type": "message", + "role": "assistant", + "status": "completed", + "id": "msg_123", + "phase": "commentary", + "content": [{"type": "output_text", "text": "Thinking..."}], + }, + { + "type": "message", + "role": "assistant", + "status": "completed", + "id": "msg_456", + "phase": "final_answer", + "content": [{"type": "output_text", "text": "Done!"}], + }, + ] + db.append_message("s1", role="assistant", content="Done!", codex_message_items=items) + + conv = db.get_messages_as_conversation("s1") + assert len(conv) == 1 + assert conv[0].get("codex_message_items") == items + def test_reasoning_not_set_for_non_assistant(self, db): """reasoning is never leaked onto user or tool messages.""" db.create_session(session_id="s1", source="telegram") @@ -1173,7 +1200,7 @@ class TestSchemaInit: def test_schema_version(self, db): cursor = db._conn.execute("SELECT version FROM schema_version") version = cursor.fetchone()[0] - assert version == 8 + assert version == 9 def test_title_column_exists(self, db): """Verify the title column was created in the sessions table.""" @@ -1229,12 +1256,12 @@ class TestSchemaInit: conn.commit() conn.close() - # Open with SessionDB — should migrate to v8 + # Open with SessionDB — should migrate to v9 migrated_db = SessionDB(db_path=db_path) # Verify migration cursor = migrated_db._conn.execute("SELECT version FROM schema_version") - assert cursor.fetchone()[0] == 8 + assert cursor.fetchone()[0] == 9 # Verify title column exists and is NULL for existing sessions session = migrated_db.get_session("existing") diff --git a/website/docs/developer-guide/session-storage.md b/website/docs/developer-guide/session-storage.md index c214015082..a7868976c0 100644 --- a/website/docs/developer-guide/session-storage.md +++ b/website/docs/developer-guide/session-storage.md @@ -82,8 +82,10 @@ CREATE TABLE IF NOT EXISTS messages ( token_count INTEGER, finish_reason TEXT, reasoning TEXT, + reasoning_content TEXT, reasoning_details TEXT, - codex_reasoning_items TEXT + codex_reasoning_items TEXT, + codex_message_items TEXT ); CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp); @@ -91,7 +93,7 @@ CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestam Notes: - `tool_calls` is stored as a JSON string (serialized list of tool call objects) -- `reasoning_details` and `codex_reasoning_items` are stored as JSON strings +- `reasoning_details`, `codex_reasoning_items`, and `codex_message_items` are stored as JSON strings - `reasoning` stores the raw reasoning text for providers that expose it - Timestamps are Unix epoch floats (`time.time()`) @@ -128,7 +130,7 @@ END; ## Schema Version and Migrations -Current schema version: **6** +Current schema version: **9** The `schema_version` table stores a single integer. On initialization, `_init_schema()` checks the current version and applies migrations sequentially: @@ -141,6 +143,9 @@ The `schema_version` table stores a single integer. On initialization, | 4 | Add unique index on `title` (NULLs allowed, non-NULL must be unique) | | 5 | Add billing columns: `cache_read_tokens`, `cache_write_tokens`, `reasoning_tokens`, `billing_provider`, `billing_base_url`, `billing_mode`, `estimated_cost_usd`, `actual_cost_usd`, `cost_status`, `cost_source`, `pricing_version` | | 6 | Add reasoning columns to messages: `reasoning`, `reasoning_details`, `codex_reasoning_items` | +| 7 | Add `reasoning_content` column to messages | +| 8 | Add `api_call_count` column to sessions | +| 9 | Add `codex_message_items` column to messages for Codex Responses message id/phase replay | Each migration uses `ALTER TABLE ADD COLUMN` wrapped in try/except to handle the column-already-exists case (idempotent). The version number is bumped after