fix(codex-runtime): retire wedged sessions + post-tool watchdog + OAuth refresh classify (#25769)

Mirrors openclaw beta.8's app-server resilience fixes so a stuck codex
subprocess can't burn the full turn deadline and so users get a
`codex login` pointer instead of raw RPC errors when their token expires.

- TurnResult.should_retire signals the caller to drop+respawn codex.
- Deadline-hit path and dead-subprocess detection set should_retire so
  the next turn doesn't ride a CPU-spinning or auth-broken process.
- Post-tool watchdog (post_tool_quiet_timeout=90s): if a tool item
  completes and codex goes silent past the threshold without further
  output or turn/completed, fast-fail instead of waiting the full 600s.
  Resets on any non-tool activity so normal think-after-tool flows are
  not affected.
- <turn_aborted> and <turn_aborted/> in agent text are treated as
  terminal — some codex builds tear down a turn that way without
  emitting turn/completed.
- _classify_oauth_failure() inspects RPC error message + stderr tail
  for invalid_grant / token refresh / 401 / etc. and rewrites
  user-facing errors to 'run codex login'. Conservative: generic
  failures still surface verbatim. Fires at turn/start failure,
  turn/completed failure, and dead-subprocess paths.
- thread/start cross-fill: tolerate thread.id, thread.sessionId,
  top-level sessionId/threadId so future codex schema drift doesn't
  KeyError us at handshake.
- run_agent.py: when run_turn returns should_retire=True OR raises,
  close + null self._codex_session so the next turn respawns.

Tests: +30 cases across session + integration suites.
  tests/agent/transports/test_codex_app_server_session.py 50/50 pass
  tests/run_agent/test_codex_app_server_integration.py 27/27 pass
  Broader codex scope (transports + cli runtime/migration) 376/376 pass
This commit is contained in:
Teknium 2026-05-14 07:55:09 -07:00 committed by GitHub
parent 63991bbd97
commit 12f755c9eb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 711 additions and 7 deletions

View file

@ -84,6 +84,14 @@ class FakeClient:
def close(self):
self._closed = True
def is_alive(self) -> bool:
# Fake is "alive" until close() is called; tests that want a dead
# subprocess can patch this attribute or call close() directly.
return not self._closed
def stderr_tail(self, n: int = 20):
return list(getattr(self, "_stderr_tail", []))[-n:]
# Test helpers
def queue_notification(self, method: str, **params):
self._notifications.append({"method": method, "params": params})
@ -91,6 +99,10 @@ class FakeClient:
def queue_server_request(self, method: str, request_id: Any = "srv-1", **params):
self._server_requests.append({"id": request_id, "method": method, "params": params})
def set_stderr_tail(self, lines):
"""Test helper: seed stderr_tail() output for OAuth-refresh classifier tests."""
self._stderr_tail = list(lines)
def make_session(client: FakeClient, **kwargs) -> CodexAppServerSession:
return CodexAppServerSession(
@ -500,3 +512,385 @@ class TestApprovalPromptEnrichment:
s.run_turn("hi", turn_timeout=1.0)
# Falls back to the reason
assert "apply some changes" in captured["command"]
# ---- openclaw beta.8 parity: retire/wedge/oauth/abort marker ----
class TestSessionRetirement:
"""Mirrors openclaw beta.8's resilience fixes:
- retire timed-out app-server clients (should_retire on deadline)
- post-tool completion watchdog (don't burn the full deadline after a
tool result if codex goes silent)
- <turn_aborted> raw marker as terminal (don't wait for turn/completed
that never comes)
- OAuth refresh failure classification (suggest `codex login` instead
of raw RPC error strings)
- dead subprocess detection between iterations
"""
def test_deadline_marks_session_for_retirement(self):
client = FakeClient()
s = make_session(client)
r = s.run_turn(
"never finishes",
turn_timeout=0.05,
notification_poll_timeout=0.01,
)
assert r.interrupted is True
assert r.error and "timed out" in r.error
assert r.should_retire is True, (
"Deadline exhaustion must signal retirement so the next turn "
"respawns codex instead of riding a wedged subprocess."
)
def test_completed_turn_does_not_retire(self):
client = FakeClient()
client.queue_notification(
"item/completed",
item={"type": "agentMessage", "id": "m1", "text": "hi"},
threadId="t", turnId="tu1",
)
client.queue_notification(
"turn/completed", threadId="t",
turn={"id": "tu1", "status": "completed", "error": None},
)
s = make_session(client)
r = s.run_turn("hi", turn_timeout=1.0)
assert r.should_retire is False
def test_post_tool_quiet_watchdog_trips_and_retires(self):
client = FakeClient()
# One tool completion, then total silence — no further events,
# no turn/completed. With a tiny post_tool_quiet_timeout the
# watchdog must fire before the larger turn deadline.
client.queue_notification(
"item/completed",
item={
"type": "commandExecution", "id": "ex1",
"command": "echo hi", "cwd": "/tmp",
"status": "completed", "aggregatedOutput": "hi",
"exitCode": 0, "commandActions": [],
},
threadId="t", turnId="tu1",
)
s = make_session(client)
r = s.run_turn(
"tool then silence",
turn_timeout=5.0, # would be miserable to wait
notification_poll_timeout=0.02,
post_tool_quiet_timeout=0.15,
)
assert r.interrupted is True
assert r.should_retire is True
assert r.error and "silent" in r.error
# Confirm we issued turn/interrupt to free codex compute
assert any(method == "turn/interrupt" for (method, _) in client.requests)
def test_post_tool_watchdog_resets_on_further_activity(self):
"""A tool completion followed by an agent message should NOT trip
the watchdog further activity = codex still alive."""
client = FakeClient()
client.queue_notification(
"item/completed",
item={
"type": "commandExecution", "id": "ex1",
"command": "echo hi", "cwd": "/tmp",
"status": "completed", "aggregatedOutput": "hi",
"exitCode": 0, "commandActions": [],
},
threadId="t", turnId="tu1",
)
# Non-tool activity immediately after — resets watchdog.
client.queue_notification(
"item/completed",
item={"type": "agentMessage", "id": "m1", "text": "tool finished"},
threadId="t", turnId="tu1",
)
client.queue_notification(
"turn/completed", threadId="t",
turn={"id": "tu1", "status": "completed", "error": None},
)
s = make_session(client)
r = s.run_turn(
"tool then talk", turn_timeout=2.0,
notification_poll_timeout=0.01,
post_tool_quiet_timeout=0.05,
)
# Tool ran, then text reset the watchdog, then turn/completed.
# Should NOT be a retirement case.
assert r.tool_iterations == 1
assert r.final_text == "tool finished"
assert r.should_retire is False
assert r.interrupted is False
def test_turn_aborted_marker_in_text_is_terminal(self):
"""If codex emits `<turn_aborted>` in agent text and never sends
turn/completed, we still exit promptly instead of burning the
deadline."""
client = FakeClient()
client.queue_notification(
"item/completed",
item={
"type": "agentMessage", "id": "m1",
"text": "partial output... <turn_aborted>",
},
threadId="t", turnId="tu1",
)
# Deliberately NO turn/completed notification queued.
s = make_session(client)
r = s.run_turn(
"abort mid-turn", turn_timeout=2.0,
notification_poll_timeout=0.01,
)
assert r.interrupted is True
assert r.error and "turn_aborted" in r.error
# Should have exited fast — not waited for the full 2s deadline.
# (Can't measure wall clock reliably in CI; presence of the marker
# error string instead of a "timed out" message is the proxy.)
assert "timed out" not in r.error
def test_turn_aborted_self_closing_marker_also_terminal(self):
client = FakeClient()
client.queue_notification(
"item/completed",
item={"type": "agentMessage", "id": "m1",
"text": "<turn_aborted/>"},
threadId="t", turnId="tu1",
)
s = make_session(client)
r = s.run_turn("x", turn_timeout=2.0,
notification_poll_timeout=0.01)
assert r.interrupted is True
assert r.error and "turn_aborted" in r.error
def test_oauth_refresh_failure_on_turn_start_suggests_login(self):
from agent.transports.codex_app_server import CodexAppServerError
client = FakeClient()
def boom(method, params):
if method == "turn/start":
raise CodexAppServerError(
code=-32603,
message="auth refresh failed: invalid_grant",
)
return {"thread": {"id": "t"},
"activePermissionProfile": {"id": "x"}}
client._request_handler = boom
s = make_session(client)
r = s.run_turn("hi", turn_timeout=1.0)
assert r.error is not None
assert "codex login" in r.error
assert r.should_retire is True
def test_oauth_failure_from_stderr_on_turn_start_failure(self):
"""If the RPC error itself is opaque but stderr shows an auth
problem, we still classify it as a refresh failure."""
from agent.transports.codex_app_server import CodexAppServerError
client = FakeClient()
client.set_stderr_tail([
"[2026-05-14T10:00:00Z WARN codex_core::auth] token refresh failed",
"[2026-05-14T10:00:00Z ERROR codex_core] please log in again",
])
def boom(method, params):
if method == "turn/start":
raise CodexAppServerError(code=-32603, message="rpc broke")
return {"thread": {"id": "t"},
"activePermissionProfile": {"id": "x"}}
client._request_handler = boom
s = make_session(client)
r = s.run_turn("hi", turn_timeout=1.0)
assert r.error is not None
assert "codex login" in r.error
assert r.should_retire is True
def test_oauth_failure_in_turn_completed_error(self):
"""A failed turn/completed whose error mentions auth/refresh
triggers the re-auth hint + retirement."""
client = FakeClient()
client.queue_notification(
"turn/completed", threadId="t",
turn={
"id": "tu1", "status": "failed",
"error": {"message": "401 Unauthorized: please reauthenticate"},
},
)
s = make_session(client)
r = s.run_turn("x", turn_timeout=1.0,
notification_poll_timeout=0.01)
assert r.error is not None
assert "codex login" in r.error
assert r.should_retire is True
def test_generic_turn_failure_does_not_trigger_oauth_hint(self):
"""A boring model error must NOT rewrite the message into a fake
re-auth hint. Conservative classifier."""
client = FakeClient()
client.queue_notification(
"turn/completed", threadId="t",
turn={
"id": "tu1", "status": "failed",
"error": {"message": "rate limit exceeded"},
},
)
s = make_session(client)
r = s.run_turn("x", turn_timeout=1.0,
notification_poll_timeout=0.01)
assert r.error is not None
assert "codex login" not in r.error
assert "rate limit exceeded" in r.error
# Generic model failures don't retire — the session itself is fine
assert r.should_retire is False
def test_dead_subprocess_detected_between_iterations(self):
"""If codex dies (segfault, OOM, killed by its auth refresh
thread), the inter-iteration is_alive check breaks the loop
instead of waiting on a queue that will never fill."""
client = FakeClient()
s = make_session(client)
s.ensure_started()
# Simulate subprocess death by setting _closed (FakeClient's
# is_alive returns False when closed).
client._closed = True
client.set_stderr_tail([
"thread 'tokio-runtime-worker' panicked at 'oauth: invalid_grant'",
])
r = s.run_turn("x", turn_timeout=2.0,
notification_poll_timeout=0.01)
assert r.should_retire is True
# Stderr-derived auth hint takes precedence over generic message
assert r.error and "codex login" in r.error
# ---- thread/start cross-fill ----
class TestThreadStartCrossFill:
"""Mirrors openclaw beta.8's tolerance for thread.id/sessionId aliasing."""
def test_thread_id_under_thread_key(self):
client = FakeClient()
s = make_session(client)
tid = s.ensure_started()
assert tid == "thread-fake-001"
def test_thread_session_id_alias_under_thread_key(self):
client = FakeClient()
client._request_handler = lambda method, params: (
{"thread": {"sessionId": "alias-1"},
"activePermissionProfile": {"id": "x"}}
if method == "thread/start" else
{"turn": {"id": "tu1"}} if method == "turn/start" else {}
)
s = make_session(client)
tid = s.ensure_started()
assert tid == "alias-1"
def test_top_level_session_id_fallback(self):
client = FakeClient()
client._request_handler = lambda method, params: (
{"sessionId": "top-1"} if method == "thread/start" else
{"turn": {"id": "tu1"}} if method == "turn/start" else {}
)
s = make_session(client)
tid = s.ensure_started()
assert tid == "top-1"
def test_missing_thread_id_raises(self):
from agent.transports.codex_app_server import CodexAppServerError
client = FakeClient()
client._request_handler = lambda method, params: (
{"thread": {}, "activePermissionProfile": {"id": "x"}}
if method == "thread/start" else
{"turn": {"id": "tu1"}}
)
s = make_session(client)
with pytest.raises(CodexAppServerError, match="no thread id"):
s.ensure_started()
class TestHasTurnAbortedMarker:
"""Unit coverage for the marker matcher itself."""
def test_empty_string(self):
from agent.transports.codex_app_server_session import (
_has_turn_aborted_marker,
)
assert _has_turn_aborted_marker("") is False
assert _has_turn_aborted_marker(None) is False # type: ignore[arg-type]
def test_plain_text_no_marker(self):
from agent.transports.codex_app_server_session import (
_has_turn_aborted_marker,
)
assert _has_turn_aborted_marker("normal response with no markers") is False
def test_open_marker(self):
from agent.transports.codex_app_server_session import (
_has_turn_aborted_marker,
)
assert _has_turn_aborted_marker("blah <turn_aborted> blah") is True
def test_self_closing_marker(self):
from agent.transports.codex_app_server_session import (
_has_turn_aborted_marker,
)
assert _has_turn_aborted_marker("<turn_aborted/>") is True
class TestClassifyOAuthFailure:
"""Unit coverage for the OAuth classifier; conservative on purpose."""
def test_invalid_grant_classified(self):
from agent.transports.codex_app_server_session import (
_classify_oauth_failure,
)
hint = _classify_oauth_failure("error: invalid_grant returned by server")
assert hint is not None
assert "codex login" in hint
def test_token_refresh_classified(self):
from agent.transports.codex_app_server_session import (
_classify_oauth_failure,
)
hint = _classify_oauth_failure("token refresh failed: network error")
assert hint is not None
assert "codex login" in hint
def test_401_classified(self):
from agent.transports.codex_app_server_session import (
_classify_oauth_failure,
)
hint = _classify_oauth_failure("HTTP 401 Unauthorized")
assert hint is not None
def test_generic_error_not_classified(self):
from agent.transports.codex_app_server_session import (
_classify_oauth_failure,
)
assert _classify_oauth_failure("connection reset") is None
assert _classify_oauth_failure("model returned bad json") is None
assert _classify_oauth_failure("rate limit exceeded") is None
def test_empty_inputs(self):
from agent.transports.codex_app_server_session import (
_classify_oauth_failure,
)
assert _classify_oauth_failure() is None
assert _classify_oauth_failure("") is None
assert _classify_oauth_failure("", None) is None # type: ignore[arg-type]
def test_multi_string_search(self):
"""Hint can come from any of the provided strings."""
from agent.transports.codex_app_server_session import (
_classify_oauth_failure,
)
hint = _classify_oauth_failure(
"rpc returned -32603",
"[stderr] token has expired, run codex login",
)
assert hint is not None

View file

@ -342,3 +342,77 @@ class TestErrorHandling:
assert result["completed"] is False
assert result["partial"] is True
assert result["error"] == "user interrupted"
class TestSessionRetirementOnRunAgent:
"""run_agent.py side: when run_turn returns should_retire=True, the
AIAgent must close + null _codex_session so the next turn respawns."""
def test_should_retire_drops_session(self, monkeypatch):
closes = {"count": 0}
def fake_run_turn(self, user_input, **kwargs):
return TurnResult(
final_text="",
projected_messages=[],
tool_iterations=0,
interrupted=True,
error="turn timed out after 600.0s",
turn_id="tu1",
thread_id="th1",
should_retire=True,
)
def fake_close(self):
closes["count"] += 1
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
monkeypatch.setattr(CodexAppServerSession, "close", fake_close)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
# The session was closed and cleared
assert closes["count"] == 1
assert getattr(agent, "_codex_session", "MISSING") is None
# Partial result was still returned (caller still sees the error)
assert result["partial"] is True
assert result["error"] == "turn timed out after 600.0s"
def test_normal_turn_keeps_session(self, fake_session):
"""fake_session fixture returns should_retire=False (default).
The session must stay attached for the next turn to reuse."""
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
agent.run_conversation("hi")
# Session was lazily created and still attached.
assert getattr(agent, "_codex_session", None) is not None
def test_exception_path_also_drops_session(self, monkeypatch):
"""Even if run_turn raises (not just sets should_retire), we must
drop the session a thrown exception is the strongest possible
signal the process is dead."""
closes = {"count": 0}
def boom_run_turn(self, user_input, **kwargs):
raise RuntimeError("codex segfaulted")
def fake_close(self):
closes["count"] += 1
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
lambda self: "th1")
monkeypatch.setattr(CodexAppServerSession, "run_turn", boom_run_turn)
monkeypatch.setattr(CodexAppServerSession, "close", fake_close)
agent = _make_codex_agent()
with patch.object(agent, "_spawn_background_review", return_value=None):
result = agent.run_conversation("hi")
assert closes["count"] == 1
assert agent._codex_session is None
assert result["completed"] is False
assert "codex segfaulted" in result["error"]