hermes-agent/tests/run_agent/test_codex_app_server_integration.py
Teknium 12f755c9eb
fix(codex-runtime): retire wedged sessions + post-tool watchdog + OAuth refresh classify (#25769)
Mirrors openclaw beta.8's app-server resilience fixes so a stuck codex
subprocess can't burn the full turn deadline and so users get a
`codex login` pointer instead of raw RPC errors when their token expires.

- TurnResult.should_retire signals the caller to drop+respawn codex.
- Deadline-hit path and dead-subprocess detection set should_retire so
  the next turn doesn't ride a CPU-spinning or auth-broken process.
- Post-tool watchdog (post_tool_quiet_timeout=90s): if a tool item
  completes and codex goes silent past the threshold without further
  output or turn/completed, fast-fail instead of waiting the full 600s.
  Resets on any non-tool activity so normal think-after-tool flows are
  not affected.
- <turn_aborted> and <turn_aborted/> in agent text are treated as
  terminal — some codex builds tear down a turn that way without
  emitting turn/completed.
- _classify_oauth_failure() inspects RPC error message + stderr tail
  for invalid_grant / token refresh / 401 / etc. and rewrites
  user-facing errors to 'run codex login'. Conservative: generic
  failures still surface verbatim. Fires at turn/start failure,
  turn/completed failure, and dead-subprocess paths.
- thread/start cross-fill: tolerate thread.id, thread.sessionId,
  top-level sessionId/threadId so future codex schema drift doesn't
  KeyError us at handshake.
- run_agent.py: when run_turn returns should_retire=True OR raises,
  close + null self._codex_session so the next turn respawns.

Tests: +30 cases across session + integration suites.
  tests/agent/transports/test_codex_app_server_session.py 50/50 pass
  tests/run_agent/test_codex_app_server_integration.py 27/27 pass
  Broader codex scope (transports + cli runtime/migration) 376/376 pass
2026-05-14 07:55:09 -07:00

418 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Integration test for the codex_app_server runtime path through AIAgent.
Verifies that:
- api_mode='codex_app_server' is accepted on AIAgent construction
- run_conversation() takes the early-return path and never enters the
chat completions loop
- Projected messages from a fake Codex session land in the messages list
- tool_iterations from the codex session tick the skill nudge counter
- Memory nudge counter ticks once per turn
- The returned dict has the same shape as the chat_completions path
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
import run_agent
from agent.transports.codex_app_server_session import CodexAppServerSession, TurnResult
@pytest.fixture
def fake_session(monkeypatch):
"""Replace CodexAppServerSession with a stub that returns a fixed
TurnResult, so we can drive AIAgent without spawning real codex."""
def fake_run_turn(self, user_input: str, **kwargs):
return TurnResult(
final_text=f"echo: {user_input}",
projected_messages=[
{"role": "assistant", "content": None,
"tool_calls": [{"id": "exec_1", "type": "function",
"function": {"name": "exec_command",
"arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "exec_1", "content": "ok"},
{"role": "assistant", "content": f"echo: {user_input}"},
],
tool_iterations=1,
interrupted=False,
error=None,
turn_id="turn-stub-1",
thread_id="thread-stub-1",
)
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
monkeypatch.setattr(
CodexAppServerSession, "ensure_started", lambda self: "thread-stub-1"
)
def _make_codex_agent():
"""Construct an AIAgent in codex_app_server mode without contacting any
real provider. We pass api_mode explicitly so the constructor takes the
fast path for direct credentials."""
return run_agent.AIAgent(
api_key="stub",
base_url="https://stub.invalid",
provider="openai",
api_mode="codex_app_server",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
class TestApiModeAccepted:
def test_api_mode_is_codex_app_server(self):
agent = _make_codex_agent()
assert agent.api_mode == "codex_app_server"
class TestRunConversationCodexPath:
def test_run_conversation_returns_codex_shape(self, fake_session):
agent = _make_codex_agent()
# No background review fork during tests
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hello there")
assert result["final_response"] == "echo: hello there"
assert result["completed"] is True
assert result["partial"] is False
assert result["error"] is None
assert result["api_calls"] == 1
assert result["codex_thread_id"] == "thread-stub-1"
assert result["codex_turn_id"] == "turn-stub-1"
def test_projected_messages_are_spliced(self, fake_session):
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hello")
msgs = result["messages"]
# User message + 3 projected (assistant tool_call + tool + assistant text)
assert len(msgs) >= 4
assert msgs[0]["role"] == "user"
assert msgs[0]["content"] == "hello"
# Last assistant message has the final text
final = [m for m in msgs if m.get("role") == "assistant"
and m.get("content") == "echo: hello"]
assert final, f"expected final assistant message in {msgs}"
def test_nudge_counters_tick(self, fake_session):
"""The skill nudge counter must accumulate tool_iterations across
turns. The memory nudge counter is gated on memory being configured
(which we skip via skip_memory=True), so we don't assert on it here —
a separate test below covers that path explicitly."""
agent = _make_codex_agent()
agent._iters_since_skill = 0
agent._user_turn_count = 0
with patch.object(agent, "_spawn_background_review", return_value=None):
agent.run_conversation("first")
assert agent._iters_since_skill == 1 # one tool_iteration in fake turn
# _user_turn_count is incremented by run_conversation pre-loop, not
# by the codex helper — confirms we delegate that to the standard flow.
assert agent._user_turn_count == 1
with patch.object(agent, "_spawn_background_review", return_value=None):
agent.run_conversation("second")
assert agent._iters_since_skill == 2
assert agent._user_turn_count == 2
def test_user_message_not_duplicated(self, fake_session):
"""Regression guard: the user message must appear exactly once in
the messages list. The standard run_conversation pre-loop appends
it, and the codex helper must NOT append again."""
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("ping unique 12345")
user_count = sum(
1 for m in result["messages"]
if m.get("role") == "user" and m.get("content") == "ping unique 12345"
)
assert user_count == 1, f"user message appeared {user_count}× in {result['messages']}"
def test_background_review_NOT_invoked_below_threshold(self, fake_session):
"""A single turn shouldn't trigger background review — counters
haven't reached the nudge interval (default 10)."""
agent = _make_codex_agent()
agent._memory_nudge_interval = 10
agent._skill_nudge_interval = 10
agent._iters_since_skill = 0
with patch.object(agent, "_spawn_background_review",
return_value=None) as spawn:
agent.run_conversation("ping")
# Below threshold → review should NOT fire (was a real bug:
# the helper was calling _spawn_background_review() with no
# args after every turn, which would crash with TypeError).
assert not spawn.called
def test_background_review_skill_trigger_fires_above_threshold(
self, monkeypatch
):
"""When tool iterations cross the skill nudge interval, the
background review fires with review_skills=True and the right
messages_snapshot signature."""
from agent.transports.codex_app_server_session import (
CodexAppServerSession, TurnResult,
)
# Make the fake session report 10 tool iterations in one turn
# (matching the default skill threshold).
def fake_run_turn(self, user_input: str, **kwargs):
return TurnResult(
final_text=f"echo: {user_input}",
projected_messages=[
{"role": "assistant", "content": f"echo: {user_input}"},
],
tool_iterations=10,
turn_id="t1", thread_id="th1",
)
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
monkeypatch.setattr(
CodexAppServerSession, "ensure_started", lambda self: "th1"
)
agent = _make_codex_agent()
agent._skill_nudge_interval = 10
agent._iters_since_skill = 0
# Make valid_tool_names include 'skill_manage' so the gate passes
agent.valid_tool_names = set(getattr(agent, "valid_tool_names", set()))
agent.valid_tool_names.add("skill_manage")
with patch.object(agent, "_spawn_background_review",
return_value=None) as spawn:
agent.run_conversation("do tool work")
assert spawn.called, "skill threshold tripped but review didn't fire"
# Verify the call signature matches what _spawn_background_review
# actually expects — this is the regression guard for the original
# bug where the codex path called it with no args at all.
call = spawn.call_args
assert "messages_snapshot" in call.kwargs
assert isinstance(call.kwargs["messages_snapshot"], list)
assert call.kwargs["review_skills"] is True
# Counter should be reset after the review fires
assert agent._iters_since_skill == 0
def test_background_review_signature_never_breaks(self, fake_session):
"""Even when no trigger fires, the helper must never call
_spawn_background_review with the wrong signature. Run a turn,
then run another turn after manually tripping the skill counter
and confirm the call shape is the kwargs-only form the function
actually accepts."""
agent = _make_codex_agent()
agent._skill_nudge_interval = 1 # very low so any iter trips it
agent._iters_since_skill = 0
agent.valid_tool_names = set(getattr(agent, "valid_tool_names", set()))
agent.valid_tool_names.add("skill_manage")
with patch.object(agent, "_spawn_background_review",
return_value=None) as spawn:
agent.run_conversation("first")
# The fake session reports tool_iterations=1, which trips
# _skill_nudge_interval=1. So review should fire.
assert spawn.called
# Critical invariant: positional args must be empty, all real
# args must be kwargs (matching _spawn_background_review's
# actual signature).
call = spawn.call_args
assert call.args == (), (
f"expected no positional args, got {call.args!r}"
"would crash _spawn_background_review at runtime"
)
assert "messages_snapshot" in call.kwargs
def test_chat_completions_loop_is_not_entered(self, fake_session):
"""The early-return must bypass the regular API call loop entirely.
We confirm by patching the SDK call and asserting it's never invoked."""
agent = _make_codex_agent()
# The chat_completions loop calls self.client.chat.completions.create(...)
# If our early-return works, that path is dead.
with patch.object(agent, "client") as client_mock, patch.object(
agent, "_spawn_background_review", return_value=None
):
agent.run_conversation("hi")
assert not client_mock.chat.completions.create.called
class TestReviewForkApiModeDowngrade:
"""When the parent agent runs on codex_app_server, the background
review fork must downgrade to codex_responses — otherwise the fork
can't dispatch agent-loop tools (memory, skill_manage) which is the
whole point of the review."""
def test_codex_app_server_parent_downgrades_review_fork(self):
"""Live test against the real _spawn_background_review code path:
verify the review_agent gets api_mode=codex_responses when the
parent is codex_app_server."""
from unittest.mock import MagicMock, patch as _patch
agent = _make_codex_agent()
# Pretend memory + skills are configured so the review fork
# reaches the AIAgent constructor.
agent._memory_store = MagicMock()
agent._memory_enabled = True
agent._user_profile_enabled = True
# Mock _current_main_runtime to return the parent's codex_app_server
# state so we can confirm the helper detects + downgrades it.
agent._current_main_runtime = lambda: {
"api_mode": "codex_app_server",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "stub-token",
}
# Capture what AIAgent gets constructed with inside the helper.
captured = {}
def _capture_init(self, **kwargs):
captured.update(kwargs)
# Set bare attributes the rest of the spawn function reads
# so it can finish without exploding.
self.api_mode = kwargs.get("api_mode")
self.provider = kwargs.get("provider")
self.model = kwargs.get("model")
self._memory_write_origin = None
self._memory_write_context = None
self._memory_store = None
self._memory_enabled = False
self._user_profile_enabled = False
self._memory_nudge_interval = 0
self._skill_nudge_interval = 0
self.suppress_status_output = False
self._session_messages = []
def _no_op_run_conv(*a, **kw):
return {"final_response": "", "messages": []}
self.run_conversation = _no_op_run_conv
def _no_op_close(*a, **kw):
return None
self.close = _no_op_close
with _patch("run_agent.AIAgent.__init__", _capture_init):
agent._spawn_background_review(
messages_snapshot=[{"role": "user", "content": "x"}],
review_memory=True,
review_skills=False,
)
# Wait for the spawned thread to actually execute
import time
for _ in range(30):
if "api_mode" in captured:
break
time.sleep(0.1)
assert captured.get("api_mode") == "codex_responses", (
f"review fork should be downgraded to codex_responses when "
f"parent is codex_app_server; got {captured.get('api_mode')!r}"
)
class TestErrorHandling:
def test_session_exception_returns_partial_with_error(self, monkeypatch):
def boom_run_turn(self, user_input, **kwargs):
raise RuntimeError("subprocess died")
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "t1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", boom_run_turn)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert result["completed"] is False
assert result["partial"] is True
assert "subprocess died" in result["error"]
assert "codex-runtime auto" in result["final_response"]
def test_interrupted_turn_marked_partial(self, monkeypatch):
def interrupted_turn(self, user_input, **kwargs):
return TurnResult(
final_text="",
projected_messages=[],
tool_iterations=0,
interrupted=True,
error="user interrupted",
turn_id="t",
thread_id="th",
)
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th")
monkeypatch.setattr(CodexAppServerSession, "run_turn", interrupted_turn)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert result["completed"] is False
assert result["partial"] is True
assert result["error"] == "user interrupted"
class TestSessionRetirementOnRunAgent:
"""run_agent.py side: when run_turn returns should_retire=True, the
AIAgent must close + null _codex_session so the next turn respawns."""
def test_should_retire_drops_session(self, monkeypatch):
closes = {"count": 0}
def fake_run_turn(self, user_input, **kwargs):
return TurnResult(
final_text="",
projected_messages=[],
tool_iterations=0,
interrupted=True,
error="turn timed out after 600.0s",
turn_id="tu1",
thread_id="th1",
should_retire=True,
)
def fake_close(self):
closes["count"] += 1
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
monkeypatch.setattr(CodexAppServerSession, "close", fake_close)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
# The session was closed and cleared
assert closes["count"] == 1
assert getattr(agent, "_codex_session", "MISSING") is None
# Partial result was still returned (caller still sees the error)
assert result["partial"] is True
assert result["error"] == "turn timed out after 600.0s"
def test_normal_turn_keeps_session(self, fake_session):
"""fake_session fixture returns should_retire=False (default).
The session must stay attached for the next turn to reuse."""
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
agent.run_conversation("hi")
# Session was lazily created and still attached.
assert getattr(agent, "_codex_session", None) is not None
def test_exception_path_also_drops_session(self, monkeypatch):
"""Even if run_turn raises (not just sets should_retire), we must
drop the session — a thrown exception is the strongest possible
signal the process is dead."""
closes = {"count": 0}
def boom_run_turn(self, user_input, **kwargs):
raise RuntimeError("codex segfaulted")
def fake_close(self):
closes["count"] += 1
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", boom_run_turn)
monkeypatch.setattr(CodexAppServerSession, "close", fake_close)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert closes["count"] == 1
assert agent._codex_session is None
assert result["completed"] is False
assert "codex segfaulted" in result["error"]