mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
feat(codex-runtime): optional codex app-server runtime for OpenAI/Codex models (#24182)
* feat(codex-runtime): scaffold optional codex app-server runtime
Foundational commit for an opt-in alternate runtime that hands OpenAI/Codex
turns to a 'codex app-server' subprocess instead of Hermes' tool dispatch.
Default behavior is unchanged.
Lands in three pieces:
1. agent/transports/codex_app_server.py — JSON-RPC 2.0 over stdio speaker
for codex's app-server protocol (codex-rs/app-server). Spawn, init
handshake, request/response, notification queue, server-initiated
request queue (for approval round-trips), interrupt-friendly blocking
reads. Tested against real codex 0.130.0 binary end-to-end during
development.
2. hermes_cli/runtime_provider.py:
- Adds 'codex_app_server' to _VALID_API_MODES.
- Adds _maybe_apply_codex_app_server_runtime() helper, called at the
end of _resolve_runtime_from_pool_entry(). Inert unless
'model.openai_runtime: codex_app_server' is set in config.yaml AND
provider in {openai, openai-codex}. Other providers cannot be
rerouted (anthropic, openrouter, etc. preserved).
3. tests/agent/transports/test_codex_app_server_runtime.py — 24 tests
covering api_mode registration, the rewriter helper (default-off,
case-insensitive, opt-in, non-eligible providers preserved), version
parser, missing-binary handling, error class. Does NOT require codex
CLI installed.
This commit is wire-only: the api_mode is recognized but AIAgent does
not yet branch on it. Followup commits add the session adapter, event
projector, approval bridge, transcript projection (so memory/skill
review still works), plugin migration, and slash command.
Existing tests remain green:
- tests/cli/test_cli_provider_resolution.py (29 passed)
- tests/agent/test_credential_pool_routing.py (included above)
* feat(codex-runtime): add codex item projector for memory/skill review
The translator that lets Hermes' self-improvement loop keep working under the
Codex runtime: converts codex 'item/*' notifications into Hermes' standard
{role, content, tool_calls, tool_call_id} message shape that
agent/curator.py already knows how to read.
Item taxonomy (matches codex-rs/app-server-protocol/src/protocol/v2/item.rs):
- userMessage → {role: user, content}
- agentMessage → {role: assistant, content: text}
- reasoning → stashed in next assistant's 'reasoning' field
- commandExecution → assistant tool_call(name='exec_command') + tool result
- fileChange → assistant tool_call(name='apply_patch') + tool result
- mcpToolCall → assistant tool_call(name='mcp.<server>.<tool>') + tool result
- dynamicToolCall → assistant tool_call(name=<tool>) + tool result
- plan/hookPrompt/etc → opaque assistant note, no fabricated tool_calls
Invariants preserved:
- Message role alternation never violated: each tool item produces at most
one assistant + one tool message in that order, correlated by call_id.
- Streaming deltas (item/<type>/outputDelta, item/agentMessage/delta)
don't materialize messages — only item/completed does. Mirrors how
Hermes already only writes the assistant message after streaming ends.
- Tool call ids are deterministic (codex item id-based) so replays produce
identical messages and prefix caches stay valid (AGENTS.md pitfall #16).
- JSON args use sorted_keys for the same reason.
Real wire formats verified against codex 0.130.0 by capturing live
notifications from thread/shellCommand and including one as a fixture
(COMMAND_EXEC_COMPLETED).
23 new tests, all green:
- Streaming deltas don't materialize (3 paths)
- Turn/thread frame events are silent
- commandExecution: 5 tests including non-zero exit annotation +
deterministic id stability across replays
- agentMessage + reasoning attachment + reasoning consumption
- fileChange: summary without inlined content
- mcpToolCall: namespaced naming + error surfacing
- userMessage: text fragments only (drops images/etc)
- opaque items: no fabricated tool_calls
- Helpers: deterministic id stability + sorted JSON args
- Role alternation invariant across all four tool-shaped item types
This commit is a pure addition. AIAgent integration (the wire that uses the
projector) is the next commit.
* feat(codex-runtime): add session adapter + approval bridge
The third self-contained module: CodexAppServerSession owns one Codex
thread per Hermes session, drives turn/start, consumes streaming
notifications via CodexEventProjector, handles server-initiated approval
requests, and translates cancellation into turn/interrupt.
The adapter has a single public per-turn method:
result = session.run_turn(user_input='...', turn_timeout=600)
# result.final_text → assistant text for the caller
# result.projected_messages → list ready to splice into AIAgent.messages
# result.tool_iterations → tick count for _iters_since_skill nudge
# result.interrupted → True on Ctrl+C / deadline / interrupt
# result.error → error string when the turn cannot complete
# result.turn_id, thread_id → for sessions DB / resume
Behavior:
- ensure_started() spawns codex, does the initialize handshake, and
issues thread/start with cwd + permissions profile. Idempotent.
- run_turn() blocks until turn/completed, drains server-initiated
requests (approvals) before reading notifications so codex never
deadlocks waiting for us, projects every item/completed via the
projector, and increments tool_iterations for the skill nudge gate.
- request_interrupt() is thread-safe (threading.Event); the next loop
iteration issues turn/interrupt and unwinds.
- turn_timeout deadlock guard issues turn/interrupt and records an
error if the turn never completes.
- close() escalates terminate → kill via the underlying client.
Approval bridge:
Codex emits server-initiated requests for execCommandApproval and
applyPatchApproval. The adapter translates Hermes' approval choice
vocabulary onto codex's decision vocabulary:
Hermes 'once' → codex 'approved'
Hermes 'session' or 'always' → codex 'approvedForSession'
Hermes 'deny' / anything else → codex 'denied'
Routing precedence:
1. _ServerRequestRouting.auto_approve_* flags (cron / non-interactive)
2. approval_callback wired by the CLI (defers to
tools.approval.prompt_dangerous_approval())
3. Fail-closed denial when neither is wired
Unknown server-request methods are answered with JSON-RPC error -32601
so codex doesn't hang waiting for us.
Permission profile mapping mirrors AGENTS.md:
Hermes 'auto' → codex 'workspace-write'
Hermes 'approval-required' → codex 'read-only-with-approval'
Hermes 'unrestricted/yolo' → codex 'full-access'
20 new tests, all green. Combined with prior commits this PR now has
67 tests across three modules:
- test_codex_app_server_runtime.py: 24 (api_mode + transport surface)
- test_codex_event_projector.py: 23 (item taxonomy projections)
- test_codex_app_server_session.py: 20 (turn loop + approvals + interrupts)
Full tests/agent/transports/ directory: 249/249 pass — no regressions
to existing transport tests.
Still no wire into AIAgent.run_conversation(); that integration commit
is small and goes next.
* feat(codex-runtime): wire codex_app_server runtime into AIAgent
The integration commit. AIAgent.run_conversation() now early-returns to a
new helper _run_codex_app_server_turn() when self.api_mode ==
'codex_app_server', bypassing the chat_completions tool loop entirely.
Three small surgical edits to run_agent.py (~105 LOC total):
1. Line ~1204 (constructor api_mode validation set):
Add 'codex_app_server' so an explicit api_mode='codex_app_server'
passed to AIAgent() isn't silently rewritten to 'chat_completions'.
2. Line ~12048 (run_conversation, just before the while loop):
Early-return to _run_codex_app_server_turn() when self.api_mode is
'codex_app_server'. Placed AFTER all standard pre-loop setup —
logging context, session DB, surrogate sanitization, _user_turn_count
and _turns_since_memory increments, _ext_prefetch_cache, memory
manager on_turn_start — so behavior outside the model-call loop is
identical between paths. Default Hermes flow is unchanged when the
flag is off.
3. End-of-class (line ~15497):
New method _run_codex_app_server_turn(). Lazy-instantiates one
CodexAppServerSession per AIAgent (reused across turns), runs the
turn, splices projected_messages into messages, increments
_iters_since_skill by tool_iterations (since the chat_completions
loop normally does that per iteration), fires
_spawn_background_review on the same cadence as the default path.
Counter accounting:
_turns_since_memory ← already incremented at run_conversation:11817
(gated on memory store configured) — codex
helper does NOT touch it (would double-count).
_user_turn_count ← already incremented at run_conversation:11793
— codex helper does NOT touch it.
_iters_since_skill ← incremented in the chat_completions loop per
tool iteration. Codex helper increments by
turn.tool_iterations since the loop is bypassed.
User message:
ALREADY appended to messages by run_conversation pre-loop (line 11823)
before the early-return reaches us. Helper does NOT append again.
Regression test test_user_message_not_duplicated guards this.
Approval callback wiring:
Lazy-fetches tools.terminal_tool._get_approval_callback at session
spawn time, passes to CodexAppServerSession. CLI threads with
prompt_toolkit get interactive approvals; gateway/cron contexts get
the codex-side fail-closed deny.
Error path:
Codex session exceptions become a 'partial' result with completed=False
and a final_response that explicitly tells the user how to switch back:
'Codex app-server turn failed: ... Fall back to default runtime with
/codex-runtime auto.' Same return-dict shape as the chat_completions
path so all callers (gateway, CLI, batch_runner, ACP) work unchanged.
9 new integration tests in tests/run_agent/test_codex_app_server_integration.py:
- api_mode='codex_app_server' is accepted on AIAgent construction
- run_conversation returns the expected codex shape
(final_response, codex_thread_id, codex_turn_id, completed, partial)
- Projected messages are spliced into messages list
- _iters_since_skill ticks per tool iteration
- _user_turn_count delegated to standard flow (not double-counted)
- User message appears exactly once (regression guard)
- _spawn_background_review IS invoked (memory/skill review keeps working)
- chat.completions.create is NEVER called (loop fully bypassed)
- Session exception → partial result with /codex-runtime auto hint
- Interrupted turn → partial result with error preserved
Adjacent test runs confirm no regressions:
- tests/run_agent/test_memory_nudge_counter_hydration.py: green
- tests/run_agent/test_background_review.py: green
- tests/run_agent/test_fallback_model.py: green
- tests/agent/transports/: 249/249 green
Still missing for full feature: /codex-runtime slash command, plugin
migration helper, docs page, live e2e test gated on codex binary. Those
are the remaining followup commits.
* feat(codex-runtime): add /codex-runtime slash command (CLI + gateway)
User-facing toggle for the optional codex app-server runtime. Follows the
'Adding a Slash Command (All Platforms)' pattern from AGENTS.md exactly:
single CommandDef in the central registry → CLI handler → gateway handler
→ running-agent guard → all surfaces (autocomplete, /help, Telegram menu,
Slack subcommands) update automatically.
Surface:
/codex-runtime — show current state + codex CLI status
/codex-runtime auto — Hermes default runtime
/codex-runtime codex_app_server — codex subprocess runtime
/codex-runtime on / off — synonyms
Files changed:
hermes_cli/codex_runtime_switch.py (new):
Pure-Python state machine shared by CLI and gateway. Parse args,
read/write model.openai_runtime in the config dict, gate enabling
behind a codex --version check (don't let users opt in to a runtime
they have no binary for; print npm install hint instead).
Returns a CodexRuntimeStatus dataclass that callers render however
suits their surface.
hermes_cli/commands.py:
Single CommandDef entry, no aliases (codex-runtime is its own thing).
cli.py:
Dispatch in process_command() + _handle_codex_runtime() handler that
delegates to the shared module and renders results via _cprint.
gateway/run.py:
Dispatch in _handle_message() + _handle_codex_runtime_command() that
returns a string (gateway sends as message). On a successful change
that requires a new session, _evict_cached_agent() forces the next
inbound message to construct a fresh AIAgent with the new api_mode —
avoids prompt-cache invalidation mid-session.
gateway/run.py running-agent guard:
/codex-runtime joins /model in the early-intercept block so a runtime
flip mid-turn can't split a turn across two transports.
Tests:
tests/hermes_cli/test_codex_runtime_switch.py — 25 tests covering the
state machine: arg parsing (10 cases incl. case-insensitive and
synonyms), reading current runtime (5 cases incl. malformed configs),
writing runtime (3 cases), apply() entry point covering read-only,
no-op, codex-missing-blocked, codex-present-success, disable-no-binary-check,
and persist-failure paths (8 cases). All green.
Adjacent test suites confirm no regressions:
- tests/hermes_cli/test_commands.py + test_codex_runtime_switch.py:
167/167 green
- tests/agent/transports/: 283/283 green when combined with prior commits
Still missing: plugin migration helper, docs page, live e2e test gated on
codex binary. Followup commits.
* feat(codex-runtime): auto-migrate Hermes MCP servers to ~/.codex/config.toml
Translates the user's mcp_servers config from ~/.hermes/config.yaml into
the TOML format codex's MCP client expects. Wired into the
/codex-runtime codex_app_server enable path so users get their MCP tool
surface in the spawned subprocess automatically.
The migration runs on every enable. Failures are non-fatal — the runtime
change still proceeds and the user gets a warning so they can fix the
codex config manually.
What translates (mapping verified against codex-rs/core/src/config/edit.rs):
Hermes mcp_servers.<n>.command/args/env → codex stdio transport
Hermes mcp_servers.<n>.url/headers → codex streamable_http transport
Hermes mcp_servers.<n>.timeout → codex tool_timeout_sec
Hermes mcp_servers.<n>.connect_timeout → codex startup_timeout_sec
Hermes mcp_servers.<n>.cwd → codex stdio cwd
Hermes mcp_servers.<n>.enabled: false → codex enabled = false
What does NOT translate (warned + skipped per server):
Hermes-specific keys (sampling, etc.) — codex's MCP client has no
equivalent. Listed in the per-server skipped[] field of the report.
What's NOT migrated (intentional):
AGENTS.md — codex respects this file natively in its cwd. Hermes' own
AGENTS.md (project-level) is already in the worktree, so codex picks
it up without translation. No code needed.
Idempotency design:
All managed content lives between a 'managed by hermes-agent' marker
and the next non-mcp_servers section header. _strip_existing_managed_block
removes the prior managed region cleanly, preserving any user-added
codex config (model, providers.openai, sandbox profiles, etc.) above
or below.
Files added:
hermes_cli/codex_runtime_plugin_migration.py — pure-Python migration
helper. Public API: migrate(hermes_config, codex_home=None,
dry_run=False) returns MigrationReport with .migrated/.errors/
.skipped_keys_per_server. No external TOML dependency — minimal
formatter handles strings/numbers/booleans/lists/inline-tables.
tests/hermes_cli/test_codex_runtime_plugin_migration.py — 39 tests
covering:
- per-server translation (12): stdio/http/sse, cwd, timeouts,
enabled flag, command+url precedence, sampling drop, unknown keys
- TOML formatter (8): types, escaping, inline tables, error case
- existing-block stripping (4): no marker, alone, with user content
above, with user content below
- end-to-end migrate() (8): empty, dry-run, round-trip, idempotent
re-run, preserves user config, error reporting, invalid input,
summary formatting
Files changed:
hermes_cli/codex_runtime_switch.py — apply() now calls migrate() in
the codex_app_server enable branch. Migration failure logs a warning
in the result message but does NOT fail the runtime change. Disable
path (auto) explicitly skips migration.
tests/hermes_cli/test_codex_runtime_switch.py — 3 new tests:
test_enable_triggers_mcp_migration, test_disable_does_not_trigger_migration,
test_migration_failure_does_not_block_enable.
All 325 feature tests green:
- tests/agent/transports/: 249 (incl. 67 new)
- tests/run_agent/test_codex_app_server_integration.py: 9
- tests/hermes_cli/test_codex_runtime_switch.py: 28 (3 new)
- tests/hermes_cli/test_codex_runtime_plugin_migration.py: 39 (new)
* perf(codex-runtime): cache codex --version check within apply()
Single /codex-runtime invocation could spawn 'codex --version' up to 3
times (state report, enable gate, success message). Each spawn is ~50ms,
so the cumulative cost wasn't a crisis, but it was wasteful and turned a
trivial slash command into something noticeably laggy on slower systems.
Refactored to lazy-once via a closure over a nonlocal cache. First call
spawns; subsequent calls in the same apply() reuse the result.
Behavior unchanged — same return shape, same error handling, same install
hint when codex is missing. Just one subprocess per call instead of three.
Two regression-guard tests added:
- test_binary_check_cached_within_apply: enable path → call_count == 1
- test_binary_check_cached_on_read_only_call: state-report path → call_count == 1
Total tests for /codex-runtime now 30 (was 28); all 143 codex-runtime
tests still green.
* fix(codex-runtime): correct protocol field names found via live e2e test
Three real bugs caught only by running a turn end-to-end against codex
0.130.0 with a real ChatGPT subscription. Unit tests passed because they
asserted on our own (incorrect) wire shapes; the wire format from
codex-rs/app-server-protocol/src/protocol/v2/* is the source of truth and
my initial reading of the README was incomplete.
Bug 1: thread/start.permissions wire format
Was sending {"profileId": "workspace-write"}.
Real format per PermissionProfileSelectionParams enum (tagged union):
{"type": "profile", "id": "workspace-write"}
AND requires the experimentalApi capability declared during initialize.
AND requires a matching [permissions] table in ~/.codex/config.toml or
codex fails the request with 'default_permissions requires a [permissions]
table'.
Fix: stop overriding permissions on thread/start. Codex picks its default
profile (read-only unless user configures otherwise), which matches what
codex CLI users expect — they configure their default permission profile
in ~/.codex/config.toml the standard way. Trying to be clever about
profile selection broke every turn we tested.
Live error before fix: 'Invalid request: missing field type' on every
turn/start, even though our turn/start payload was correct — the field
codex was complaining about was inside the permissions sub-object we
shouldn't have been sending.
Bug 2: server-request method names
Was matching 'execCommandApproval' and 'applyPatchApproval'.
Real names per common.rs ServerRequest enum:
item/commandExecution/requestApproval
item/fileChange/requestApproval
item/permissions/requestApproval (new third method)
Fix: match the documented names. Added handler for
item/permissions/requestApproval that always declines — codex sometimes
asks to escalate permissions mid-turn and silent acceptance would surprise
users.
Live symptom before fix: agent.log showed
'Unknown codex server request: item/commandExecution/requestApproval'
and codex stalled because we replied with -32601 (unsupported method)
instead of an approval decision. The agent reported back 'The write
command was rejected' even though Hermes never showed the user an
approval prompt.
Bug 3: approval decision values
Was sending decision strings 'approved'/'approvedForSession'/'denied'.
Real values per CommandExecutionApprovalDecision enum (camelCase):
accept, acceptForSession, decline, cancel
(also AcceptWithExecpolicyAmendment and ApplyNetworkPolicyAmendment
variants we don't currently use).
Fix: rename _approval_choice_to_codex_decision return values; update
auto_approve_* fallbacks; update fail-closed default from 'denied' to
'decline'. Test mapping table updated to match.
Live test verified after fixes:
$ hermes (with model.openai_runtime: codex_app_server)
> Run the shell command: echo hermes-codex-livetest > .../proof.txt
then read it back
Approval prompt fired with 'Codex requests exec in <cwd>'.
User chose 'Allow once'. Codex executed the command, wrote the file,
read it back. Final response: 'Read back from proof.txt:
hermes-codex-livetest'. File contents on disk match.
agent.log confirms:
codex app-server thread started: id=019e200e profile=workspace-write
cwd=/tmp/hermes-codex-livetest/workspace
All 20 session tests still green after wire-format updates.
* fix(codex-runtime): correct apply_patch approval params + ship docs
Live e2e revealed FileChangeRequestApprovalParams doesn't carry the
changeset (just itemId, threadId, turnId, reason, grantRoot) — Codex's
'reason' field describes what the patch wants to do. Test config and
display logic updated to use it. The first 'apply_patch (0 change(s))'
display from the live test is now 'apply_patch: <reason>'.
Adds website/docs/user-guide/features/codex-app-server-runtime.md
covering enable/disable, prerequisites, approval UX, MCP migration
behavior, permission profile delegation to ~/.codex/config.toml, known
limitations, and the architecture diagram. Wired into the Automation
category in sidebars.ts.
Live e2e validation across the path matrix:
✓ thread/start handshake
✓ turn/start with text input
✓ commandExecution items + projection
✓ item/commandExecution/requestApproval → Hermes UI → response
✓ Approve once → command runs
✓ Deny → command rejected, codex falls back to read-only message
✓ Multi-turn (codex remembers prior turn's results)
✓ apply_patch via Codex's fileChange path
✓ item/fileChange/requestApproval → Hermes UI
✓ MCP server migration loads inside spawned codex (verified via
'use the filesystem MCP tool' prompt)
✓ /codex-runtime auto → codex_app_server toggle cycle
✓ Disable doesn't trigger migration
✓ Enable with codex CLI present succeeds + migrates
✓ Hermes-side interrupt path (turn/interrupt request issued cleanly
even if codex finishes before the interrupt lands)
Known live-validated limitations now documented in the docs page:
- delegate_task subagents unavailable on this runtime
- permission profile selection delegated to ~/.codex/config.toml
- apply_patch approval prompt has no inline changeset (codex protocol
doesn't expose it)
145/145 codex-runtime tests still green.
* feat(codex-runtime): native plugin migration + UX polish (quirks 2/4/5/10/11)
Major: migrate native Codex plugins (#7 in OpenClaw's PR list)
Discovers installed curated plugins via codex's plugin/list RPC and
writes [plugins."<name>@<marketplace>"] entries to ~/.codex/config.toml
so they're enabled in the spawned Codex sessions. This is the
'YouTube-video-worthy' bit Pash highlighted: when a user has
google-calendar, github, etc. installed in their Codex CLI, those
plugins activate automatically when they enable Hermes' codex runtime.
Implementation:
- hermes_cli/codex_runtime_plugin_migration.py: new _query_codex_plugins()
helper spawns 'codex app-server' briefly and walks plugin/list. Returns
(plugins, error) — failures are non-fatal so MCP migration still works.
- render_codex_toml_section() now takes plugins + permissions args.
- migrate() defaults: discover_plugins=True, default_permission_profile=
'workspace-write'. Explicit None on either disables that side.
- _strip_existing_managed_block() now also strips [plugins.*] and
[permissions]/[permissions.*] sections inside the managed block, so
re-runs replace plugins cleanly without touching codex's own config.
Quirk fixes:
#2 Default permissions profile written on enable.
Without this, Codex's read-only default kicks in and EVERY write
triggers an approval prompt. Now writes [permissions] default =
'workspace-write' so the runtime feels normal out of the box. Set
default_permission_profile=None to opt out.
#4 apply_patch approval prompt now shows what's changing.
Codex's FileChangeRequestApprovalParams doesn't carry the changeset.
Session adapter now caches the fileChange item from item/started
notifications and looks it up by itemId when codex requests approval.
Prompt shows '1 add, 1 update: /tmp/new.py, /tmp/old.py' instead of
'apply_patch (0 change(s))'.
Side benefit: also drains pending notifications BEFORE handling a
server request, so the projector and per-turn caches are up to date
when the approval decision fires. Bounded to 8 notifications per
loop iter to avoid starving codex's response.
#5/#10 Exec approval prompt never shows empty cwd.
When codex omits cwd in CommandExecutionRequestApprovalParams, fall
back to the session's cwd. If somehow neither is available, show
'<unknown>' explicitly instead of an empty string.
Also surfaces 'reason' from the approval params when codex provides
it — gives users more context on why codex wants to run something.
#11 Banner indicates the codex_app_server runtime when active.
New 'Runtime: codex app-server (terminal/file ops/MCP run inside
codex)' line appears in the welcome banner only when the runtime is
on. Default banner is unchanged.
Tests:
- 7 new tests in test_codex_runtime_plugin_migration.py covering
plugin discovery (mocked), failure handling, dry-run skip, opt-out
flag, idempotent re-runs, and permissions writing.
- 3 new tests in test_codex_app_server_session.py covering the
enriched approval prompts: cwd fallback, change summary on
apply_patch, fallback when no item/started cache exists.
- All 26 session tests + 46 migration tests green; 153 total in PR.
* feat(codex-runtime): hermes-tools MCP callback + native plugin migration
The big architectural addition: when codex_app_server runtime is on,
Hermes registers its own tool surface as an MCP server in
~/.codex/config.toml so the codex subprocess can call back into Hermes
for tools codex doesn't ship with — web_search, browser_*, vision,
image_generate, skills, TTS.
Also: 'migrate native codex plugins' (Pash's YouTube-video-worthy bit) —
when the user has plugins like Linear, GitHub, Gmail, Calendar, Canva
installed via 'codex plugin', Hermes discovers them via plugin/list and
writes [plugins.<name>@openai-curated] entries so they activate
automatically.
New module: agent/transports/hermes_tools_mcp_server.py
FastMCP stdio server exposing 17 Hermes tools. Each call dispatches
through model_tools.handle_function_call() — same code path as the
Hermes default runtime. Run with:
python -m agent.transports.hermes_tools_mcp_server [--verbose]
Exposed: web_search, web_extract, browser_navigate / _click / _type /
_press / _snapshot / _scroll / _back / _get_images / _console /
_vision, vision_analyze, image_generate, skill_view, skills_list,
text_to_speech.
NOT exposed (deliberately):
- terminal/shell/read_file/write_file/patch — codex has built-ins
- delegate_task/memory/session_search/todo — _AGENT_LOOP_TOOLS in
model_tools.py:493, require running AIAgent context. Documented
as a limitation and surfaced in the slash command output.
Migration changes (hermes_cli/codex_runtime_plugin_migration.py):
- _query_codex_plugins() spawns 'codex app-server' briefly to walk
plugin/list and pull installed openai-curated plugins. Failures are
non-fatal — MCP migration still completes.
- render_codex_toml_section() now takes plugins + permissions args
AND wraps the managed block with a MIGRATION_END_MARKER comment so
the stripper can reliably find both ends, even when the block
contains top-level keys (default_permissions = ...).
- migrate() defaults: discover_plugins=True, expose_hermes_tools=True,
default_permission_profile=':workspace' (built-in codex profile name
— must be prefixed with ':'). All three opt-out via explicit args.
- _build_hermes_tools_mcp_entry() builds the codex stdio entry with
HERMES_HOME and PYTHONPATH passthrough so a worktree-launched
Hermes points the MCP subprocess at the same module layout.
Live-caught wire bugs fixed during this turn:
1. Permission profile config key is top-level , NOT a [permissions] table. The [permissions] table is
for *user-defined* profiles with structured fields. Built-in
profile names start with ':' (':workspace', ':read-only',
':danger-no-sandbox'). Was emitting
which codex rejected with 'invalid type: string "X", expected
struct PermissionProfileToml'.
2. Built-in profile is , NOT . Codex
rejected with 'unknown built-in profile'.
3. Codex's MCP layer sends for
tool-call confirmation. We weren't handling it, so codex stalled
and returned 'MCP tool call was rejected'. Now: auto-accept for
our own hermes-tools server (user already opted in by enabling
the runtime), decline for third-party servers.
Quirk fixes shipped (from the limitations list):
#2 default permissions: workspace profile written on enable. No more
approval prompt on every write.
#4 apply_patch approval shows what's changing: cache fileChange
items from item/started, look up by itemId when codex sends
item/fileChange/requestApproval. Prompt: '1 add, 1 update:
/tmp/new.py, /tmp/old.py' instead of '0 change(s)'.
#5/#10 exec approval cwd never empty: fall back to session cwd, then
'<unknown>'. Also surfaces 'reason' from codex when present.
#11 banner shows 'Runtime: codex app-server' line when active so
users understand why tool counts may not match what's reachable.
Tests:
- 5 new tests in test_codex_runtime_plugin_migration.py covering
plugin discovery, expose_hermes_tools entry generation, idempotent
re-runs, opt-out flag, permissions profile.
- 3 new tests in test_codex_app_server_session.py covering enriched
approval prompts (cwd fallback, fileChange summary).
- 2 new tests for mcpServer/elicitation/request handling (accept
hermes-tools, decline others).
- New test file test_hermes_tools_mcp_server.py covering module
surface, EXPOSED_TOOLS safety invariants (no shell/file_ops,
no agent-loop tools), and main() error paths.
- 166 codex-runtime tests total, all green.
Live e2e validated against codex 0.130.0 + ChatGPT subscription:
✓ /codex-runtime codex_app_server enables, migrates filesystem MCP,
registers hermes-tools, writes default_permissions = ':workspace'
✓ Banner shows 'Runtime: codex app-server' line in subsequent sessions
✓ Shell command runs without approval prompt (workspace profile works)
✓ Multi-turn — codex remembers prior turn's results
✓ apply_patch path via fileChange request approval
✓ web_search via hermes-tools MCP callback returns real Firecrawl
results: 'OpenAI Codex CLI – Getting Started' end-to-end in 13s
✓ Disable cycle clean
Docs updated: website/docs/user-guide/features/codex-app-server-runtime.md
Full re-write covering native plugin migration, the hermes-tools
callback architecture, the prerequisites change ('codex login is
separate from hermes auth login codex'), the trade-off table now
reflecting which Hermes tools work via callback, and the limitations
list updated with what's actually unavailable on this runtime.
* feat(codex-runtime): pin user-config preservation invariant for quirk #6
Quirk #6 from the limitations list — user MCP servers / overrides /
codex-only sections in ~/.codex/config.toml that live OUTSIDE the
hermes-managed block must survive re-migration verbatim.
This already worked thanks to the MIGRATION_MARKER + MIGRATION_END_MARKER
pair I added when fixing the default_permissions wire format (so the
strip can find both ends of the managed region even with top-level
keys like default_permissions). But it was an emergent property
without a test pinning it.
Now explicitly tested:
- User MCP server above the managed block survives migration
- User MCP server below the managed block survives migration
- Both above + below survive a second re-migration
- User content (model, providers, sandbox, otel, etc.) outside our
region is left untouched
Docs added a section "Editing ~/.codex/config.toml safely" explaining
the marker contract — so users know they can add their own MCP
servers, override permissions, configure codex-only options, etc.
without fear of Hermes overwriting their work.
167 codex-runtime tests, all green.
* docs(codex-runtime): clarify the actual tool surface — shell covers terminal/read/write/find
Previous docs and PR description undersold what codex's built-in
toolset actually provides. apply_patch alone made it sound like the
runtime could only edit files in patch format — implying you'd lose
terminal use, read_file, write_file, search/find. That was wrong.
Codex's 'shell' tool runs arbitrary shell commands inside the sandbox,
which covers everything you'd do in bash: cat/head/tail (read), echo>
or heredocs (write), find/rg/grep (search), ls/cd (navigate), build/
test/git/etc. apply_patch is for structured multi-file edits on top
of that. update_plan is its in-runtime todo. view_image loads images.
And codex has its own web_search built in (in addition to the
Firecrawl-backed one Hermes exposes via MCP callback).
Docs now have a 'What tools the model actually has' section right
after Why, breaking the surface into three clearly-labeled buckets:
1. Codex's built-in toolset (always on) — shell, apply_patch,
update_plan, view_image, web_search; covers everything terminal-
adjacent.
2. Native Codex plugins (auto-migrated from your codex plugin
install) — Linear, GitHub, Gmail, Calendar, Outlook, Canva, etc.
3. Hermes tool callback (MCP server in ~/.codex/config.toml) —
web_search/web_extract via Firecrawl, browser_*, vision_analyze,
image_generate, skill_view/skills_list, text_to_speech.
Plus a 'What's NOT available' callout listing the four agent-loop tools
(delegate_task, memory, session_search, todo) that need running
AIAgent context and can't reach the codex runtime.
Trade-offs table broken out: shell, apply_patch, update_plan,
view_image, sandbox each get their own row with a one-line description
so users can see at a glance what's available natively.
Architecture diagram updated to list the codex built-ins by name
instead of 'apply_patch + shell + sandbox'.
No code changes — purely docs clarification. 167 codex-runtime tests
still green.
* fix(codex-runtime): _spawn_background_review signature + review fork api_mode downgrade
Two real bugs in the self-improvement loop integration that the previous
test mocked away.
Bug 1: wrong call signature
The codex helper was calling self._spawn_background_review() with no
args after every turn. That function actually requires:
messages_snapshot=list (positional or keyword)
review_memory=bool (at least one trigger must be True)
review_skills=bool
So the call would have raised TypeError at runtime — except the only
test that exercised this path mocked _spawn_background_review entirely
and just asserted spawn.called, so the wrong-arg shape never surfaced.
Bug 2: review fork inherits codex_app_server api_mode
The review fork is constructed with:
api_mode = _parent_runtime.get('api_mode')
So when the parent is codex_app_server, the review fork ALSO runs as
codex_app_server. But the review fork's whole job is to call agent-loop
tools (memory, skill_manage) which require Hermes' own dispatch — they
short-circuit with 'must be handled by the agent loop' on the codex
runtime. So the review fork would have run, decided to save something,
called memory or skill_manage, and silently no-op'd.
Fixed in run_agent.py:_spawn_background_review() — when the parent
api_mode is 'codex_app_server', the review fork is downgraded to
'codex_responses' (same OAuth credentials, same openai-codex provider,
but talks to OpenAI's Responses API directly so Hermes owns the loop).
Also rewrote the codex helper's review wiring to match the
chat_completions path:
- Computes _should_review_memory in the pre-loop block (was already
being computed; now passed through to the helper as an arg).
- Computes _should_review_skills AFTER the codex turn returns +
counters tick (line ~15432 pattern in chat_completions).
- Calls _spawn_background_review(messages_snapshot=, review_memory=,
review_skills=) only when at least one trigger fires.
- Adds the external memory provider sync (_sync_external_memory_for_turn)
that the chat_completions path runs after every turn.
Tests:
Replaced the broken test_background_review_invoked (which only
asserted spawn.called) with three sharper tests:
- test_background_review_NOT_invoked_below_threshold:
single turn at default thresholds → no review fires (would have
caught the original 'every turn calls spawn with no args' bug)
- test_background_review_skill_trigger_fires_above_threshold:
10 tool_iterations at threshold=10 → review fires with
messages_snapshot=list, review_skills=True, counter resets
- test_background_review_signature_never_breaks: regression guard
asserting positional args are always empty and kwargs include
messages_snapshot
New TestReviewForkApiModeDowngrade class:
- test_codex_app_server_parent_downgrades_review_fork: drives the
real _spawn_background_review function (no mock at that level),
asserts the review_agent gets api_mode='codex_responses' when
the parent was codex_app_server.
Live-validated against real run_conversation:
- Counter ticked from 0 to 5 after a 5-tool-iteration turn
- _spawn_background_review fired exactly once with kwargs-only signature
- review_skills=True, review_memory=False
- messages_snapshot was 12 entries (5 assistant tool_calls + 5 tool
results + 1 final assistant + initial system/user)
- Counter reset to 0 after fire
170 codex-runtime tests, all green.
Docs: added a Self-improvement loop section to the codex runtime page
explaining both how the trigger logic stays equivalent and that the
review fork is auto-downgraded to codex_responses for the agent-loop
tools. Also clarified that apply_patch and update_plan ARE codex's
built-in tools (the previous version made it sound like they were
separate from 'codex's stuff' — they're not, all five tools listed
in 'What tools the model actually has' section 1 are codex built-ins).
* feat(codex-runtime): expose kanban tools through Hermes MCP callback
Kanban workers spawn as separate hermes chat -q subprocesses that read
the user's config.yaml. If model.openai_runtime: codex_app_server is set
globally (which is the whole point of opt-in), every dispatched worker
ALSO comes up on the codex runtime.
That mostly works — codex's built-in shell + apply_patch + update_plan
do the actual task work fine — but it had one critical break: the
worker handoff tools (kanban_complete, kanban_block, kanban_comment,
kanban_heartbeat) are Hermes-registered tools, not codex built-ins.
On the codex runtime, codex builds its own tool list and these never
reach the model, so the worker would do the work but not be able to
report back, hanging until the dispatcher's timeout escalates it as
zombie.
Fix: add all 9 kanban tools to the EXPOSED_TOOLS list in the Hermes
MCP callback. They dispatch statelessly through handle_function_call()
just like web_search and the others — they read HERMES_KANBAN_TASK
from env (set by the dispatcher), gate correctly (worker tools require
the env var, orchestrator tools require it unset), and write to
~/.hermes/kanban.db.
Why kanban tools work via stateless dispatch when delegate_task/memory/
session_search/todo don't: those four are listed in _AGENT_LOOP_TOOLS
(model_tools.py:493) and short-circuit in handle_function_call() with
'must be handled by the agent loop' — they need to mutate AIAgent's
mid-loop state. Kanban tools have no such requirement; they're pure
side-effect functions against the kanban.db plus state_meta.
Tools exposed:
Worker handoff (require HERMES_KANBAN_TASK):
kanban_complete, kanban_block, kanban_comment, kanban_heartbeat
Read-only board queries:
kanban_show, kanban_list
Orchestrator (require HERMES_KANBAN_TASK unset):
kanban_create, kanban_unblock, kanban_link
Tests:
- test_kanban_worker_tools_exposed: complete/block/comment/heartbeat
in EXPOSED_TOOLS (regression guard for the would-hang-worker bug)
- test_kanban_orchestrator_tools_exposed: create/show/list/unblock/link
Docs:
- New 'Workflow features' section in the docs page covering /goal,
kanban, and cron behavior on this runtime
- /goal: works fully via run_conversation feedback; only caveat is
approval-prompt noise on long writes-heavy goals (mitigated by
the default :workspace permission profile)
- Kanban: enumerated which tools are reachable via the callback and
why the env var propagates correctly through the codex subprocess
to the MCP server subprocess
- Cron: documented as 'not specifically tested' — same rules as the
CLI apply since cron runs through AIAgent.run_conversation
- Trade-offs table gained rows for /goal, kanban worker, kanban
orchestrator
172/172 codex-runtime tests green (+2 from kanban tests).
* docs(codex-runtime): wire /codex-runtime into slash-commands ref + flag aux token cost
Three docs gaps caught during a final audit:
1. /codex-runtime was only in the feature docs page, not in the
slash-commands reference. Added rows to both the CLI section and
the Messaging section so users discover it where they'd look for
slash command syntax.
2. CODEX_HOME and HERMES_KANBAN_TASK weren't in environment-variables.md.
CODEX_HOME lets users redirect Codex CLI's config dir (the migration
honors it). HERMES_KANBAN_TASK is set by the kanban dispatcher and
propagates to the codex subprocess + the hermes-tools MCP subprocess
so kanban worker tools gate correctly — documented as 'don't set
manually' since it's an internal handoff.
3. Aux client behavior on this runtime. When openai_runtime=
codex_app_server is on with the openai-codex provider, every aux
task (title generation, context compression, vision auto-detect,
session search summarization, the background self-improvement review
fork) flows through the user's ChatGPT subscription by default.
This is true for the existing codex_responses path too, but it's
more visible / important here because users explicitly opted in for
subscription billing. Added a 'Auxiliary tasks and ChatGPT
subscription token cost' section to the docs page with a YAML
example showing how to override specific aux tasks to a cheaper
model (typically google/gemini-3-flash-preview via OpenRouter).
Also documents how the self-improvement review fork gets
auto-downgraded from codex_app_server to codex_responses by the
fix earlier in this PR.
No code changes — pure docs. 172 codex-runtime tests still green.
* docs+test(codex-runtime): pin HOME passthrough, document multi-profile + CODEX_HOME
OpenClaw hit a real footgun in openclaw/openclaw#81562: when spawning
codex app-server they were synthesizing a per-agent HOME alongside
CODEX_HOME. That made every subprocess codex's shell tool launches
(gh, git, aws, npm, gcloud, ...) see a fake $HOME and miss the user's
real config files. They had to back it out in PR #81562 — keep
CODEX_HOME isolation, leave HOME alone.
Audit confirms Hermes' codex spawn doesn't have this problem. We do
os.environ.copy() and only overlay CODEX_HOME (when provided) and
RUST_LOG. HOME passes through unchanged. But it was an emergent
property without a test pinning it, so adding a regression guard:
test_spawn_env_preserves_HOME — confirms parent HOME survives intact
in the subprocess env
test_spawn_env_sets_CODEX_HOME_when_provided — confirms codex_home
arg still isolates
codex state correctly
Docs additions:
'HOME environment variable passthrough' section — calls out the
contract explicitly: CODEX_HOME isolates codex's own state, HOME
stays user-real so gh/git/aws/npm/etc. find their normal config.
Cites openclaw#81562 as the cautionary tale.
'Multi-profile / multi-tenant setups' section — addresses the
related concern: profiles share ~/.codex/ by default. For users who
want per-profile codex isolation (separate auth, separate plugins),
documents the manual CODEX_HOME=<profile-scoped-dir> approach.
Explains why we DON'T auto-scope CODEX_HOME per profile: doing so
would silently invalidate existing codex login state for anyone
upgrading to this PR with tokens already at ~/.codex/auth.json.
Opt-in is safer than surprising users.
174 codex-runtime tests (+2 from HOME guards), all green.
* fix(codex-runtime): TOML control-char escapes + atomic config.toml write
Two footguns caught in a final audit pass before merge.
Bug 1: TOML control characters not escaped
The _format_toml_value() helper escaped backslashes and double quotes
but passed literal control characters (\n, \t, \r, \f, \b) through
unchanged. TOML basic strings don't allow literal control characters
— a path or env var containing a newline would produce invalid TOML
that codex refuses to load.
Realistic exposure: pathological cases like a HERMES_HOME with a
trailing newline (env var concatenation accident), or a PYTHONPATH
with a tab from a multi-line shell heredoc.
Fix: escape all five TOML basic-string control sequences (\b \t \n
\f \r) in addition to \\ and \" that we already did. Order
matters — backslash must come first or the other escapes get
re-escaped.
Bug 2: config.toml write wasn't atomic
If the python process crashed between target.mkdir() and the
write_text() finishing, a half-written config.toml could be left
behind. On NFS / Windows / some FUSE mounts this is a real concern;
on ext4/APFS small writes are usually atomic in practice but not
guaranteed.
Fix: write to a tempfile.mkstemp() temp file in the same directory,
then Path.replace() (atomic same-dir rename on POSIX, ReplaceFile on
Windows). On rename failure, clean up the temp file so repeated
failed migrations don't pile up .config.toml.* files.
Tests:
- test_string_with_newline_escaped — \n in value → \n in output
- test_string_with_tab_escaped — \t in value → \t in output
- test_string_with_other_controls_escaped — \r, \f, \b
- test_windows_path_escaped_correctly — backslash doubling
- test_atomic_write_no_temp_leak_on_success — no .config.toml.*
left over after a successful write
- test_atomic_write_cleanup_on_rename_failure — temp file removed
when Path.replace raises (simulated disk full)
180 codex-runtime tests, all green (+6 from this commit).
Footguns audited but NOT fixed (with rationale):
- Concurrent migrations race. Two Hermes processes hitting
/codex-runtime codex_app_server within seconds of each other could
cause one writer to lose entries. Low probability (you'd have to
enable from two surfaces simultaneously) and low impact (just re-run
migration). Adding fcntl/msvcrt locking is more code than it's
worth here. The atomic rename above means each individual write is
consistent — only the merge step is racy.
- Codex protocol version drift. We pin MIN_CODEX_VERSION=0.125 and
check at runtime but don't reject too-new versions. Right call —
the protocol has been stable through 0.125 → 0.130. If OpenAI
breaks it later we'd see the error in test_codex_app_server_runtime
on CI before users hit it.
This commit is contained in:
parent
9d42c2c286
commit
091d8e1030
23 changed files with 5395 additions and 3 deletions
243
tests/agent/transports/test_codex_app_server_runtime.py
Normal file
243
tests/agent/transports/test_codex_app_server_runtime.py
Normal file
|
|
@ -0,0 +1,243 @@
|
|||
"""Tests for the optional codex app-server runtime gate.
|
||||
|
||||
These are unit tests for the api_mode rewriter and the wire-level transport
|
||||
module. They do NOT require the `codex` CLI to be installed — that's
|
||||
covered by a separate live test gated on `codex --version`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_cli.runtime_provider import (
|
||||
_VALID_API_MODES,
|
||||
_maybe_apply_codex_app_server_runtime,
|
||||
)
|
||||
|
||||
|
||||
class TestApiModeRegistration:
|
||||
"""The new api_mode must be registered or downstream parsing rejects it."""
|
||||
|
||||
def test_codex_app_server_is_a_valid_api_mode(self) -> None:
|
||||
assert "codex_app_server" in _VALID_API_MODES
|
||||
|
||||
def test_existing_api_modes_still_present(self) -> None:
|
||||
# Regression guard: don't accidentally delete other api_modes when
|
||||
# touching this set.
|
||||
for mode in (
|
||||
"chat_completions",
|
||||
"codex_responses",
|
||||
"anthropic_messages",
|
||||
"bedrock_converse",
|
||||
):
|
||||
assert mode in _VALID_API_MODES
|
||||
|
||||
|
||||
class TestMaybeApplyCodexAppServerRuntime:
|
||||
"""The opt-in helper that rewrites api_mode → codex_app_server."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"model_cfg",
|
||||
[
|
||||
None,
|
||||
{},
|
||||
{"openai_runtime": ""},
|
||||
{"openai_runtime": "auto"},
|
||||
{"openai_runtime": "AUTO"},
|
||||
{"other_key": "codex_app_server"}, # wrong key
|
||||
],
|
||||
)
|
||||
def test_default_off_for_openai(self, model_cfg) -> None:
|
||||
"""Default behavior is preserved when the flag is unset/auto."""
|
||||
got = _maybe_apply_codex_app_server_runtime(
|
||||
provider="openai", api_mode="chat_completions", model_cfg=model_cfg
|
||||
)
|
||||
assert got == "chat_completions"
|
||||
|
||||
def test_opt_in_rewrites_openai(self) -> None:
|
||||
got = _maybe_apply_codex_app_server_runtime(
|
||||
provider="openai",
|
||||
api_mode="chat_completions",
|
||||
model_cfg={"openai_runtime": "codex_app_server"},
|
||||
)
|
||||
assert got == "codex_app_server"
|
||||
|
||||
def test_opt_in_rewrites_openai_codex(self) -> None:
|
||||
got = _maybe_apply_codex_app_server_runtime(
|
||||
provider="openai-codex",
|
||||
api_mode="codex_responses",
|
||||
model_cfg={"openai_runtime": "codex_app_server"},
|
||||
)
|
||||
assert got == "codex_app_server"
|
||||
|
||||
def test_case_insensitive(self) -> None:
|
||||
got = _maybe_apply_codex_app_server_runtime(
|
||||
provider="openai",
|
||||
api_mode="chat_completions",
|
||||
model_cfg={"openai_runtime": "Codex_App_Server"},
|
||||
)
|
||||
assert got == "codex_app_server"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"provider",
|
||||
[
|
||||
"anthropic",
|
||||
"openrouter",
|
||||
"xai",
|
||||
"qwen-oauth",
|
||||
"google-gemini-cli",
|
||||
"opencode-zen",
|
||||
"bedrock",
|
||||
"",
|
||||
],
|
||||
)
|
||||
def test_other_providers_never_rerouted(self, provider) -> None:
|
||||
"""Non-OpenAI providers MUST NOT be rerouted even with the flag set —
|
||||
codex's app-server can only run OpenAI/Codex auth flows."""
|
||||
got = _maybe_apply_codex_app_server_runtime(
|
||||
provider=provider,
|
||||
api_mode="anthropic_messages",
|
||||
model_cfg={"openai_runtime": "codex_app_server"},
|
||||
)
|
||||
assert got == "anthropic_messages", (
|
||||
f"provider={provider!r} should not be rerouted to codex_app_server"
|
||||
)
|
||||
|
||||
|
||||
class TestCodexAppServerModule:
|
||||
"""Module-surface tests for the JSON-RPC speaker. Don't require codex CLI."""
|
||||
|
||||
def test_module_imports(self) -> None:
|
||||
from agent.transports import codex_app_server
|
||||
|
||||
assert codex_app_server.MIN_CODEX_VERSION >= (0, 1, 0)
|
||||
assert callable(codex_app_server.parse_codex_version)
|
||||
assert callable(codex_app_server.check_codex_binary)
|
||||
|
||||
def test_parse_codex_version_valid(self) -> None:
|
||||
from agent.transports.codex_app_server import parse_codex_version
|
||||
|
||||
assert parse_codex_version("codex-cli 0.130.0") == (0, 130, 0)
|
||||
assert parse_codex_version("codex-cli 1.2.3 (extra metadata)") == (1, 2, 3)
|
||||
assert parse_codex_version("codex 99.0.1\n") == (99, 0, 1)
|
||||
|
||||
def test_parse_codex_version_invalid(self) -> None:
|
||||
from agent.transports.codex_app_server import parse_codex_version
|
||||
|
||||
assert parse_codex_version("nope") is None
|
||||
assert parse_codex_version("") is None
|
||||
assert parse_codex_version(None) is None # type: ignore[arg-type]
|
||||
|
||||
def test_check_binary_handles_missing_executable(self) -> None:
|
||||
from agent.transports.codex_app_server import check_codex_binary
|
||||
|
||||
ok, msg = check_codex_binary(codex_bin="/nonexistent/codex/binary/path")
|
||||
assert ok is False
|
||||
assert "not found" in msg.lower() or "no such" in msg.lower()
|
||||
|
||||
def test_codex_error_class_is_runtimeerror(self) -> None:
|
||||
from agent.transports.codex_app_server import CodexAppServerError
|
||||
|
||||
err = CodexAppServerError(code=-32600, message="boom")
|
||||
assert isinstance(err, RuntimeError)
|
||||
assert "boom" in str(err)
|
||||
assert "-32600" in str(err)
|
||||
|
||||
|
||||
class TestSpawnEnvIsolation:
|
||||
"""The codex spawn must NOT rewrite HOME — codex's shell tool spawns
|
||||
subprocesses (gh, git, npm, aws, gcloud, ...) that need to find their
|
||||
config in the real user $HOME. CODEX_HOME isolates codex's own state,
|
||||
HOME stays unchanged.
|
||||
|
||||
OpenClaw hit this footgun (openclaw/openclaw#81562) — they were
|
||||
rewriting HOME to a synthetic per-agent dir alongside CODEX_HOME,
|
||||
and then `gh auth status` / git config / etc. all broke inside codex
|
||||
shell calls. We avoid the same bug by only overlaying CODEX_HOME and
|
||||
RUST_LOG on top of os.environ.copy().
|
||||
"""
|
||||
|
||||
def test_spawn_env_preserves_HOME(self, monkeypatch):
|
||||
"""The spawn env must contain the parent process's HOME unchanged.
|
||||
Verifies via a subprocess-monkey-patch."""
|
||||
import subprocess
|
||||
from agent.transports import codex_app_server as cas
|
||||
|
||||
captured = {}
|
||||
|
||||
class FakePopen:
|
||||
def __init__(self, cmd, *args, **kwargs):
|
||||
captured["env"] = kwargs.get("env", {}).copy()
|
||||
# Provide minimal Popen surface so __init__ doesn't crash
|
||||
# on attribute access during construction.
|
||||
self.stdin = None
|
||||
self.stdout = None
|
||||
self.stderr = None
|
||||
self.pid = 1
|
||||
self.returncode = None
|
||||
|
||||
def poll(self):
|
||||
return None
|
||||
|
||||
def terminate(self):
|
||||
pass
|
||||
|
||||
def wait(self, timeout=None):
|
||||
return 0
|
||||
|
||||
def kill(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(subprocess, "Popen", FakePopen)
|
||||
monkeypatch.setenv("HOME", "/users/alice")
|
||||
|
||||
client = cas.CodexAppServerClient(codex_bin="codex")
|
||||
client._closed = True # so close() is a no-op
|
||||
|
||||
# The spawn env must have HOME=/users/alice unchanged
|
||||
assert captured["env"].get("HOME") == "/users/alice", (
|
||||
f"HOME got rewritten in codex spawn env: "
|
||||
f"{captured['env'].get('HOME')!r}. Codex's shell tool's "
|
||||
"subprocesses (gh, git, aws, npm) need the user's real HOME."
|
||||
)
|
||||
|
||||
def test_spawn_env_sets_CODEX_HOME_when_provided(self, monkeypatch):
|
||||
"""CODEX_HOME isolation must still work — that's the whole point
|
||||
of the codex_home arg."""
|
||||
import subprocess
|
||||
from agent.transports import codex_app_server as cas
|
||||
|
||||
captured = {}
|
||||
|
||||
class FakePopen:
|
||||
def __init__(self, cmd, *args, **kwargs):
|
||||
captured["env"] = kwargs.get("env", {}).copy()
|
||||
self.stdin = None
|
||||
self.stdout = None
|
||||
self.stderr = None
|
||||
self.pid = 1
|
||||
self.returncode = None
|
||||
|
||||
def poll(self):
|
||||
return None
|
||||
|
||||
def terminate(self):
|
||||
pass
|
||||
|
||||
def wait(self, timeout=None):
|
||||
return 0
|
||||
|
||||
def kill(self):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(subprocess, "Popen", FakePopen)
|
||||
monkeypatch.setenv("HOME", "/users/alice")
|
||||
|
||||
client = cas.CodexAppServerClient(
|
||||
codex_bin="codex", codex_home="/tmp/profile/codex"
|
||||
)
|
||||
client._closed = True
|
||||
|
||||
assert captured["env"].get("CODEX_HOME") == "/tmp/profile/codex"
|
||||
# And HOME still passes through unchanged
|
||||
assert captured["env"].get("HOME") == "/users/alice"
|
||||
502
tests/agent/transports/test_codex_app_server_session.py
Normal file
502
tests/agent/transports/test_codex_app_server_session.py
Normal file
|
|
@ -0,0 +1,502 @@
|
|||
"""Tests for CodexAppServerSession — drive turns through a mock client.
|
||||
|
||||
The session adapter has the most complex behavior of the three new modules:
|
||||
notification draining, server-request handling (approvals), interrupt,
|
||||
deadline timeouts. These tests pin all of that without spawning real codex.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Optional
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.transports.codex_app_server_session import (
|
||||
CodexAppServerSession,
|
||||
TurnResult,
|
||||
_ServerRequestRouting,
|
||||
_approval_choice_to_codex_decision,
|
||||
)
|
||||
|
||||
|
||||
class FakeClient:
|
||||
"""Stand-in for CodexAppServerClient that records calls and lets the test
|
||||
drive the notification / server-request streams synchronously."""
|
||||
|
||||
def __init__(self, *, codex_bin: str = "codex", codex_home=None) -> None:
|
||||
self.codex_bin = codex_bin
|
||||
self.codex_home = codex_home
|
||||
self.requests: list[tuple[str, dict]] = []
|
||||
self.notifications_responses: list[dict] = []
|
||||
self.responses: list[tuple[Any, dict]] = []
|
||||
self.error_responses: list[tuple[Any, int, str]] = []
|
||||
self._initialized = False
|
||||
self._closed = False
|
||||
self._notifications: list[dict] = []
|
||||
self._server_requests: list[dict] = []
|
||||
self._request_handler = None # Optional[Callable[[str, dict], dict]]
|
||||
|
||||
# API matching CodexAppServerClient
|
||||
def initialize(self, **kwargs):
|
||||
self._initialized = True
|
||||
return {"userAgent": "fake/0.0.0", "codexHome": "/tmp",
|
||||
"platformOs": "linux", "platformFamily": "unix"}
|
||||
|
||||
def request(self, method: str, params: Optional[dict] = None, timeout: float = 30.0):
|
||||
self.requests.append((method, params or {}))
|
||||
if self._request_handler is not None:
|
||||
return self._request_handler(method, params or {})
|
||||
# Sensible defaults for protocol methods used by the session
|
||||
if method == "thread/start":
|
||||
return {"thread": {"id": "thread-fake-001"},
|
||||
"activePermissionProfile": {"id": "workspace-write"}}
|
||||
if method == "turn/start":
|
||||
return {"turn": {"id": "turn-fake-001"}}
|
||||
if method == "turn/interrupt":
|
||||
return {}
|
||||
return {}
|
||||
|
||||
def notify(self, method: str, params=None):
|
||||
pass
|
||||
|
||||
def respond(self, request_id, result):
|
||||
self.responses.append((request_id, result))
|
||||
|
||||
def respond_error(self, request_id, code, message, data=None):
|
||||
self.error_responses.append((request_id, code, message))
|
||||
|
||||
def take_notification(self, timeout: float = 0.0):
|
||||
if self._notifications:
|
||||
return self._notifications.pop(0)
|
||||
# Honor a tiny sleep so the loop doesn't hot-spin; the real client
|
||||
# blocks on a queue. For tests we want determinism.
|
||||
if timeout > 0:
|
||||
time.sleep(min(timeout, 0.001))
|
||||
return None
|
||||
|
||||
def take_server_request(self, timeout: float = 0.0):
|
||||
if self._server_requests:
|
||||
return self._server_requests.pop(0)
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
self._closed = True
|
||||
|
||||
# Test helpers
|
||||
def queue_notification(self, method: str, **params):
|
||||
self._notifications.append({"method": method, "params": params})
|
||||
|
||||
def queue_server_request(self, method: str, request_id: Any = "srv-1", **params):
|
||||
self._server_requests.append({"id": request_id, "method": method, "params": params})
|
||||
|
||||
|
||||
def make_session(client: FakeClient, **kwargs) -> CodexAppServerSession:
|
||||
return CodexAppServerSession(
|
||||
cwd="/tmp",
|
||||
client_factory=lambda **kw: client,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
# ---- choice mapping ----
|
||||
|
||||
class TestApprovalChoiceMapping:
|
||||
@pytest.mark.parametrize("choice,expected", [
|
||||
("once", "accept"),
|
||||
("session", "acceptForSession"),
|
||||
("always", "acceptForSession"),
|
||||
("deny", "decline"),
|
||||
("anything-else", "decline"),
|
||||
])
|
||||
def test_mapping(self, choice, expected):
|
||||
assert _approval_choice_to_codex_decision(choice) == expected
|
||||
|
||||
|
||||
# ---- lifecycle ----
|
||||
|
||||
class TestLifecycle:
|
||||
def test_ensure_started_is_idempotent(self):
|
||||
client = FakeClient()
|
||||
s = make_session(client)
|
||||
tid_a = s.ensure_started()
|
||||
tid_b = s.ensure_started()
|
||||
assert tid_a == tid_b == "thread-fake-001"
|
||||
# thread/start should be called exactly once
|
||||
method_calls = [m for (m, _) in client.requests if m == "thread/start"]
|
||||
assert len(method_calls) == 1
|
||||
|
||||
def test_thread_start_passes_cwd_only(self):
|
||||
"""thread/start carries cwd. We intentionally do NOT pass `permissions`
|
||||
on this codex version (experimentalApi-gated + requires matching
|
||||
config.toml [permissions] table). Letting codex use its default
|
||||
(read-only unless user configures otherwise) is the documented path."""
|
||||
client = FakeClient()
|
||||
s = make_session(client, permission_profile="workspace-write")
|
||||
s.ensure_started()
|
||||
method, params = next(r for r in client.requests if r[0] == "thread/start")
|
||||
assert params["cwd"] == "/tmp"
|
||||
assert "permissions" not in params # see session.ensure_started() comment
|
||||
|
||||
def test_close_idempotent(self):
|
||||
client = FakeClient()
|
||||
s = make_session(client)
|
||||
s.ensure_started()
|
||||
s.close()
|
||||
s.close()
|
||||
assert client._closed is True
|
||||
|
||||
|
||||
# ---- turn loop ----
|
||||
|
||||
class TestRunTurn:
|
||||
def test_simple_text_turn_returns_final_message(self):
|
||||
client = FakeClient()
|
||||
client.queue_notification("turn/started", threadId="t", turn={"id": "tu1"})
|
||||
client.queue_notification(
|
||||
"item/completed",
|
||||
item={"type": "agentMessage", "id": "m1", "text": "hello world"},
|
||||
threadId="t", turnId="tu1",
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed",
|
||||
threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
s = make_session(client)
|
||||
r = s.run_turn("hi", turn_timeout=2.0)
|
||||
assert r.final_text == "hello world"
|
||||
assert r.interrupted is False
|
||||
assert r.error is None
|
||||
assert any(m["role"] == "assistant" and m.get("content") == "hello world"
|
||||
for m in r.projected_messages)
|
||||
# turn_id propagated for downstream session-DB linkage
|
||||
assert r.turn_id == "turn-fake-001"
|
||||
|
||||
def test_tool_iteration_counter_ticks(self):
|
||||
client = FakeClient()
|
||||
# Two completed exec items + one final agent message
|
||||
for i, item_id in enumerate(("ex1", "ex2"), start=1):
|
||||
client.queue_notification(
|
||||
"item/completed",
|
||||
item={
|
||||
"type": "commandExecution", "id": item_id,
|
||||
"command": f"cmd{i}", "cwd": "/tmp",
|
||||
"status": "completed", "aggregatedOutput": "ok",
|
||||
"exitCode": 0, "commandActions": [],
|
||||
},
|
||||
threadId="t", turnId="tu1",
|
||||
)
|
||||
client.queue_notification(
|
||||
"item/completed",
|
||||
item={"type": "agentMessage", "id": "m1", "text": "done"},
|
||||
threadId="t", turnId="tu1",
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
s = make_session(client)
|
||||
r = s.run_turn("do stuff", turn_timeout=2.0)
|
||||
assert r.tool_iterations == 2
|
||||
# Each tool item produces (assistant, tool) — 2*2 + final assistant = 5 msgs
|
||||
assert len(r.projected_messages) == 5
|
||||
|
||||
def test_turn_start_failure_returns_error(self):
|
||||
client = FakeClient()
|
||||
from agent.transports.codex_app_server import CodexAppServerError
|
||||
|
||||
def boom(method, params):
|
||||
if method == "turn/start":
|
||||
raise CodexAppServerError(code=-32600, message="bad input")
|
||||
return {"thread": {"id": "t"}, "activePermissionProfile": {"id": "x"}}
|
||||
|
||||
client._request_handler = boom
|
||||
s = make_session(client)
|
||||
r = s.run_turn("hi", turn_timeout=2.0)
|
||||
assert r.error is not None
|
||||
assert "bad input" in r.error
|
||||
assert r.final_text == ""
|
||||
|
||||
def test_interrupt_during_turn_issues_turn_interrupt(self):
|
||||
client = FakeClient()
|
||||
# Don't queue turn/completed — the loop has to interrupt out
|
||||
client.queue_notification(
|
||||
"item/completed",
|
||||
item={"type": "commandExecution", "id": "x", "command": "sleep 60",
|
||||
"cwd": "/", "status": "inProgress",
|
||||
"aggregatedOutput": None, "exitCode": None,
|
||||
"commandActions": []},
|
||||
threadId="t", turnId="tu1",
|
||||
)
|
||||
s = make_session(client)
|
||||
s.ensure_started()
|
||||
# Trip the interrupt before run_turn even consumes the notification.
|
||||
# The loop will see interrupt set on its first iteration and bail.
|
||||
s.request_interrupt()
|
||||
r = s.run_turn("loop forever", turn_timeout=2.0)
|
||||
assert r.interrupted is True
|
||||
# turn/interrupt was requested with the right turnId
|
||||
assert any(
|
||||
method == "turn/interrupt" and params.get("turnId") == "turn-fake-001"
|
||||
for (method, params) in client.requests
|
||||
)
|
||||
|
||||
def test_deadline_exceeded_records_error(self):
|
||||
client = FakeClient()
|
||||
# No notifications and no completion → must hit deadline
|
||||
s = make_session(client)
|
||||
r = s.run_turn("never finishes", turn_timeout=0.05,
|
||||
notification_poll_timeout=0.01)
|
||||
assert r.interrupted is True
|
||||
assert r.error and "timed out" in r.error
|
||||
|
||||
def test_failed_turn_records_error_from_turn_completed(self):
|
||||
client = FakeClient()
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "failed",
|
||||
"error": {"message": "model error"}},
|
||||
)
|
||||
s = make_session(client)
|
||||
r = s.run_turn("x", turn_timeout=1.0)
|
||||
assert r.error and "model error" in r.error
|
||||
|
||||
|
||||
# ---- approval bridge ----
|
||||
|
||||
class TestServerRequestRouting:
|
||||
def test_exec_approval_with_callback_approves_once(self):
|
||||
client = FakeClient()
|
||||
client.queue_server_request(
|
||||
"item/commandExecution/requestApproval", request_id="req-1",
|
||||
command="ls /tmp", cwd="/tmp",
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
|
||||
captured: dict = {}
|
||||
|
||||
def cb(command, description, *, allow_permanent=True):
|
||||
captured["command"] = command
|
||||
captured["description"] = description
|
||||
return "once"
|
||||
|
||||
s = make_session(client, approval_callback=cb)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
assert captured["command"] == "ls /tmp"
|
||||
# The session must have responded to the server request with "accept"
|
||||
assert ("req-1", {"decision": "accept"}) in client.responses
|
||||
|
||||
def test_exec_approval_no_callback_denies(self):
|
||||
client = FakeClient()
|
||||
client.queue_server_request("item/commandExecution/requestApproval", request_id="req-1",
|
||||
command="rm -rf /", cwd="/")
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
s = make_session(client) # no approval_callback wired
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
assert ("req-1", {"decision": "decline"}) in client.responses
|
||||
|
||||
def test_apply_patch_approval_session_maps_to_session_decision(self):
|
||||
client = FakeClient()
|
||||
client.queue_server_request(
|
||||
"item/fileChange/requestApproval", request_id="req-2",
|
||||
itemId="fc-1",
|
||||
turnId="t1",
|
||||
threadId="th",
|
||||
startedAtMs=1234567890,
|
||||
reason="create new file with hello() function",
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
|
||||
def cb(command, description, *, allow_permanent=True):
|
||||
return "session"
|
||||
|
||||
s = make_session(client, approval_callback=cb)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
assert ("req-2", {"decision": "acceptForSession"}) in client.responses
|
||||
|
||||
def test_unknown_server_request_replied_with_error(self):
|
||||
client = FakeClient()
|
||||
client.queue_server_request("totally/unknown", request_id="req-3")
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
s = make_session(client)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
assert any(
|
||||
rid == "req-3" and code == -32601
|
||||
for (rid, code, _msg) in client.error_responses
|
||||
)
|
||||
|
||||
def test_mcp_elicitation_for_hermes_tools_auto_accepts(self):
|
||||
"""When codex elicits on behalf of hermes-tools (our own callback),
|
||||
accept automatically — the user already opted in by enabling the
|
||||
runtime."""
|
||||
client = FakeClient()
|
||||
client.queue_server_request(
|
||||
"mcpServer/elicitation/request", request_id="elic-1",
|
||||
threadId="t", turnId="tu1",
|
||||
serverName="hermes-tools",
|
||||
mode="form",
|
||||
message="confirm",
|
||||
requestedSchema={"type": "object", "properties": {}},
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
s = make_session(client)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
assert ("elic-1", {"action": "accept", "content": None, "_meta": None}) in client.responses
|
||||
|
||||
def test_mcp_elicitation_for_other_servers_declines(self):
|
||||
"""For third-party MCP servers we decline by default so users
|
||||
explicitly opt in through codex's own UI."""
|
||||
client = FakeClient()
|
||||
client.queue_server_request(
|
||||
"mcpServer/elicitation/request", request_id="elic-2",
|
||||
threadId="t", turnId="tu1",
|
||||
serverName="some-third-party",
|
||||
mode="url",
|
||||
message="please log in",
|
||||
url="https://example.com/oauth",
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
s = make_session(client)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
assert ("elic-2", {"action": "decline", "content": None, "_meta": None}) in client.responses
|
||||
|
||||
def test_routing_auto_approve_bypass(self):
|
||||
client = FakeClient()
|
||||
client.queue_server_request("item/commandExecution/requestApproval", request_id="r1",
|
||||
command="ls", cwd="/")
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
# No callback, but routing says auto-approve. Should approve.
|
||||
s = make_session(client, request_routing=_ServerRequestRouting(
|
||||
auto_approve_exec=True))
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
assert ("r1", {"decision": "accept"}) in client.responses
|
||||
|
||||
def test_callback_raises_falls_back_to_decline(self):
|
||||
client = FakeClient()
|
||||
client.queue_server_request("item/commandExecution/requestApproval", request_id="r1",
|
||||
command="ls", cwd="/")
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
|
||||
def boom(*a, **kw):
|
||||
raise RuntimeError("ui crashed")
|
||||
|
||||
s = make_session(client, approval_callback=boom)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
# Fail-closed: deny on callback exception
|
||||
assert ("r1", {"decision": "decline"}) in client.responses
|
||||
|
||||
|
||||
# ---- enriched approval prompts ----
|
||||
|
||||
class TestApprovalPromptEnrichment:
|
||||
"""Quirk #4: apply_patch prompt should show what's changing.
|
||||
Quirk #10: exec prompt should never show empty cwd."""
|
||||
|
||||
def test_exec_falls_back_to_session_cwd(self):
|
||||
"""When codex omits cwd from the approval params, the prompt shows
|
||||
the session cwd, not an empty string."""
|
||||
client = FakeClient()
|
||||
client.queue_server_request(
|
||||
"item/commandExecution/requestApproval", request_id="r1",
|
||||
command="ls", # no cwd
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
captured = {}
|
||||
def cb(command, description, *, allow_permanent=True):
|
||||
captured["description"] = description
|
||||
return "once"
|
||||
s = make_session(client, approval_callback=cb)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
# Session cwd is /tmp by default in make_session()
|
||||
assert "/tmp" in captured["description"]
|
||||
assert "Codex requests exec in <unknown>" not in captured["description"]
|
||||
|
||||
def test_apply_patch_prompt_summarizes_pending_changes(self):
|
||||
"""When the projector has cached the fileChange item from item/started,
|
||||
the approval prompt surfaces the change summary."""
|
||||
client = FakeClient()
|
||||
# item/started fires first (carries the changes), then approval request
|
||||
client.queue_notification(
|
||||
"item/started",
|
||||
item={"type": "fileChange", "id": "fc-1",
|
||||
"changes": [
|
||||
{"kind": {"type": "add"}, "path": "/tmp/new.py"},
|
||||
{"kind": {"type": "update"}, "path": "/tmp/old.py"},
|
||||
]},
|
||||
threadId="t", turnId="tu1",
|
||||
)
|
||||
client.queue_server_request(
|
||||
"item/fileChange/requestApproval", request_id="req-2",
|
||||
itemId="fc-1", turnId="tu1", threadId="t",
|
||||
startedAtMs=1234567890,
|
||||
reason="add and update files",
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
captured = {}
|
||||
def cb(command, description, *, allow_permanent=True):
|
||||
captured["command"] = command
|
||||
captured["description"] = description
|
||||
return "once"
|
||||
s = make_session(client, approval_callback=cb)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
# Both add and update kinds should be in the summary
|
||||
assert "1 add" in captured["command"] or "1 add" in captured["description"]
|
||||
assert "1 update" in captured["command"] or "1 update" in captured["description"]
|
||||
# And at least one of the paths
|
||||
joined = captured["command"] + " " + captured["description"]
|
||||
assert "/tmp/new.py" in joined or "/tmp/old.py" in joined
|
||||
|
||||
def test_apply_patch_prompt_works_without_cached_summary(self):
|
||||
"""When approval arrives before item/started (or without changes
|
||||
info), prompt falls back to whatever codex provided."""
|
||||
client = FakeClient()
|
||||
client.queue_server_request(
|
||||
"item/fileChange/requestApproval", request_id="req-2",
|
||||
itemId="fc-orphan", turnId="tu1", threadId="t",
|
||||
startedAtMs=1234567890,
|
||||
reason="apply some changes",
|
||||
)
|
||||
client.queue_notification(
|
||||
"turn/completed", threadId="t",
|
||||
turn={"id": "tu1", "status": "completed", "error": None},
|
||||
)
|
||||
captured = {}
|
||||
def cb(command, description, *, allow_permanent=True):
|
||||
captured["command"] = command
|
||||
return "once"
|
||||
s = make_session(client, approval_callback=cb)
|
||||
s.run_turn("hi", turn_timeout=1.0)
|
||||
# Falls back to the reason
|
||||
assert "apply some changes" in captured["command"]
|
||||
303
tests/agent/transports/test_codex_event_projector.py
Normal file
303
tests/agent/transports/test_codex_event_projector.py
Normal file
|
|
@ -0,0 +1,303 @@
|
|||
"""Tests for CodexEventProjector — codex item/* events → Hermes messages list.
|
||||
|
||||
Drives projection against fixture notifications captured from codex 0.130.0
|
||||
plus synthetic ones for item types we couldn't auth-test live."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.transports.codex_event_projector import (
|
||||
CodexEventProjector,
|
||||
ProjectionResult,
|
||||
_deterministic_call_id,
|
||||
_format_tool_args,
|
||||
)
|
||||
|
||||
|
||||
# --- Fixture: real `commandExecution` notification captured from codex 0.130.0
|
||||
COMMAND_EXEC_COMPLETED = {
|
||||
"method": "item/completed",
|
||||
"params": {
|
||||
"item": {
|
||||
"type": "commandExecution",
|
||||
"id": "f8a75c66-a89e-4fd7-8bcf-2d58e664fa9e",
|
||||
"command": "/bin/bash -lc 'echo hello && ls /tmp | head -3'",
|
||||
"cwd": "/tmp",
|
||||
"processId": None,
|
||||
"source": "userShell",
|
||||
"status": "completed",
|
||||
"commandActions": [
|
||||
{"type": "listFiles", "command": "ls /tmp", "path": "tmp"}
|
||||
],
|
||||
"aggregatedOutput": "hello\naa_lang.json\n",
|
||||
"exitCode": 0,
|
||||
"durationMs": 10,
|
||||
},
|
||||
"threadId": "019e1a94-352b-71e1-b214-e5c67c9ec190",
|
||||
"turnId": "019e1a94-3553-7940-8af3-4ca57142deb7",
|
||||
"completedAtMs": 1778562381151,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class TestProjectionInvariants:
|
||||
"""Universal invariants that must hold across all projection paths."""
|
||||
|
||||
def test_streaming_deltas_dont_materialize(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
for delta_method in (
|
||||
"item/commandExecution/outputDelta",
|
||||
"item/agentMessage/delta",
|
||||
"item/reasoning/delta",
|
||||
):
|
||||
r = p.project({"method": delta_method, "params": {"delta": "x"}})
|
||||
assert r.messages == [], (
|
||||
f"{delta_method} should NOT produce messages — only "
|
||||
f"item/completed materializes"
|
||||
)
|
||||
assert r.is_tool_iteration is False
|
||||
assert r.final_text is None
|
||||
|
||||
def test_turn_started_and_completed_are_silent(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
for method in ("turn/started", "turn/completed", "thread/started"):
|
||||
r = p.project({"method": method, "params": {}})
|
||||
assert r.messages == []
|
||||
|
||||
def test_unknown_method_silent(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
r = p.project({"method": "totally/unknown", "params": {}})
|
||||
assert r.messages == []
|
||||
|
||||
|
||||
class TestCommandExecutionProjection:
|
||||
"""Real captured notification → assistant tool_call + tool result."""
|
||||
|
||||
def test_command_completed_produces_two_messages(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
r = p.project(COMMAND_EXEC_COMPLETED)
|
||||
assert len(r.messages) == 2
|
||||
assert r.is_tool_iteration is True
|
||||
|
||||
def test_first_message_is_assistant_tool_call(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
msgs = p.project(COMMAND_EXEC_COMPLETED).messages
|
||||
assistant = msgs[0]
|
||||
assert assistant["role"] == "assistant"
|
||||
assert assistant["content"] is None
|
||||
assert len(assistant["tool_calls"]) == 1
|
||||
tc = assistant["tool_calls"][0]
|
||||
assert tc["type"] == "function"
|
||||
assert tc["function"]["name"] == "exec_command"
|
||||
args = json.loads(tc["function"]["arguments"])
|
||||
assert "echo hello" in args["command"]
|
||||
assert args["cwd"] == "/tmp"
|
||||
|
||||
def test_second_message_is_tool_result_correlating_by_id(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
msgs = p.project(COMMAND_EXEC_COMPLETED).messages
|
||||
assistant, tool = msgs
|
||||
assert tool["role"] == "tool"
|
||||
assert tool["tool_call_id"] == assistant["tool_calls"][0]["id"]
|
||||
assert "hello" in tool["content"]
|
||||
|
||||
def test_nonzero_exit_code_annotated_in_tool_result(self) -> None:
|
||||
item = {**COMMAND_EXEC_COMPLETED["params"]["item"], "exitCode": 2,
|
||||
"aggregatedOutput": "boom"}
|
||||
notif = {
|
||||
"method": "item/completed",
|
||||
"params": {**COMMAND_EXEC_COMPLETED["params"], "item": item},
|
||||
}
|
||||
p = CodexEventProjector()
|
||||
msgs = p.project(notif).messages
|
||||
assert "[exit 2]" in msgs[1]["content"]
|
||||
assert "boom" in msgs[1]["content"]
|
||||
|
||||
def test_deterministic_call_id_across_replay(self) -> None:
|
||||
# Same item id → same call_id (prefix cache must stay valid).
|
||||
p1 = CodexEventProjector()
|
||||
p2 = CodexEventProjector()
|
||||
a = p1.project(COMMAND_EXEC_COMPLETED).messages
|
||||
b = p2.project(COMMAND_EXEC_COMPLETED).messages
|
||||
assert a[0]["tool_calls"][0]["id"] == b[0]["tool_calls"][0]["id"]
|
||||
|
||||
|
||||
class TestAgentMessageProjection:
|
||||
"""assistant text → final_text + assistant message."""
|
||||
|
||||
def test_agent_message_projects_to_assistant(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
r = p.project({
|
||||
"method": "item/completed",
|
||||
"params": {"item": {"type": "agentMessage", "id": "x",
|
||||
"text": "hi there"}},
|
||||
})
|
||||
assert r.final_text == "hi there"
|
||||
assert r.messages == [{"role": "assistant", "content": "hi there"}]
|
||||
assert r.is_tool_iteration is False
|
||||
|
||||
def test_pending_reasoning_attaches_to_next_assistant_message(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
# First a reasoning item lands
|
||||
r1 = p.project({
|
||||
"method": "item/completed",
|
||||
"params": {"item": {"type": "reasoning", "id": "r1",
|
||||
"summary": ["thinking..."],
|
||||
"content": ["step 1", "step 2"]}},
|
||||
})
|
||||
assert r1.messages == [] # reasoning alone produces no message
|
||||
# Then the assistant message
|
||||
r2 = p.project({
|
||||
"method": "item/completed",
|
||||
"params": {"item": {"type": "agentMessage", "id": "a1",
|
||||
"text": "ok"}},
|
||||
})
|
||||
assistant = r2.messages[0]
|
||||
assert "reasoning" in assistant
|
||||
assert "thinking" in assistant["reasoning"]
|
||||
assert "step 1" in assistant["reasoning"]
|
||||
|
||||
def test_reasoning_consumed_after_attaching(self) -> None:
|
||||
p = CodexEventProjector()
|
||||
p.project({"method": "item/completed", "params": {"item": {
|
||||
"type": "reasoning", "id": "r1", "summary": ["once"], "content": []}}})
|
||||
first = p.project({"method": "item/completed", "params": {"item": {
|
||||
"type": "agentMessage", "id": "a", "text": "first"}}}).messages[0]
|
||||
second = p.project({"method": "item/completed", "params": {"item": {
|
||||
"type": "agentMessage", "id": "b", "text": "second"}}}).messages[0]
|
||||
assert "reasoning" in first
|
||||
assert "reasoning" not in second
|
||||
|
||||
|
||||
class TestFileChangeProjection:
|
||||
def test_file_change_summary_no_inlined_content(self) -> None:
|
||||
item = {
|
||||
"type": "fileChange",
|
||||
"id": "fc1",
|
||||
"status": "applied",
|
||||
"changes": [
|
||||
{"kind": {"type": "add"}, "path": "/tmp/new.py"},
|
||||
{"kind": {"type": "update"}, "path": "/tmp/old.py"},
|
||||
],
|
||||
}
|
||||
p = CodexEventProjector()
|
||||
msgs = p.project({"method": "item/completed",
|
||||
"params": {"item": item}}).messages
|
||||
assert len(msgs) == 2
|
||||
tc = msgs[0]["tool_calls"][0]
|
||||
assert tc["function"]["name"] == "apply_patch"
|
||||
args = json.loads(tc["function"]["arguments"])
|
||||
assert len(args["changes"]) == 2
|
||||
assert all("kind" in c and "path" in c for c in args["changes"])
|
||||
assert "applied" in msgs[1]["content"]
|
||||
|
||||
|
||||
class TestMcpToolCallProjection:
|
||||
def test_mcp_tool_call_namespaced(self) -> None:
|
||||
item = {
|
||||
"type": "mcpToolCall",
|
||||
"id": "m1",
|
||||
"server": "obsidian",
|
||||
"tool": "search_notes",
|
||||
"status": "completed",
|
||||
"arguments": {"query": "hermes"},
|
||||
"result": {"content": [{"text": "found"}]},
|
||||
"error": None,
|
||||
}
|
||||
msgs = CodexEventProjector().project(
|
||||
{"method": "item/completed", "params": {"item": item}}
|
||||
).messages
|
||||
assert msgs[0]["tool_calls"][0]["function"]["name"] == "mcp.obsidian.search_notes"
|
||||
assert "found" in msgs[1]["content"]
|
||||
|
||||
def test_mcp_error_surfaced(self) -> None:
|
||||
item = {
|
||||
"type": "mcpToolCall", "id": "m2",
|
||||
"server": "x", "tool": "y", "status": "failed",
|
||||
"arguments": {}, "result": None,
|
||||
"error": {"code": -1, "message": "no"},
|
||||
}
|
||||
msgs = CodexEventProjector().project(
|
||||
{"method": "item/completed", "params": {"item": item}}
|
||||
).messages
|
||||
assert "error" in msgs[1]["content"]
|
||||
|
||||
|
||||
class TestUserAndOpaqueProjection:
|
||||
def test_user_message_text_fragments_only(self) -> None:
|
||||
item = {
|
||||
"type": "userMessage", "id": "u1",
|
||||
"content": [
|
||||
{"type": "text", "text": "hello"},
|
||||
{"type": "image", "url": "http://x/y"},
|
||||
{"type": "text", "text": "world"},
|
||||
],
|
||||
}
|
||||
msgs = CodexEventProjector().project(
|
||||
{"method": "item/completed", "params": {"item": item}}
|
||||
).messages
|
||||
assert msgs[0]["role"] == "user"
|
||||
assert "hello" in msgs[0]["content"]
|
||||
assert "world" in msgs[0]["content"]
|
||||
|
||||
def test_opaque_item_recorded_without_fabricated_tool_calls(self) -> None:
|
||||
item = {"type": "plan", "id": "p1", "text": "do the thing"}
|
||||
msgs = CodexEventProjector().project(
|
||||
{"method": "item/completed", "params": {"item": item}}
|
||||
).messages
|
||||
assert len(msgs) == 1
|
||||
assert msgs[0]["role"] == "assistant"
|
||||
assert "plan" in msgs[0]["content"].lower()
|
||||
assert "tool_calls" not in msgs[0]
|
||||
|
||||
|
||||
class TestHelpers:
|
||||
def test_deterministic_call_id_stable(self) -> None:
|
||||
assert _deterministic_call_id("exec", "abc") == _deterministic_call_id("exec", "abc")
|
||||
assert _deterministic_call_id("exec", "abc") != _deterministic_call_id("exec", "xyz")
|
||||
|
||||
def test_deterministic_call_id_handles_missing_id(self) -> None:
|
||||
# Should not raise, should be stable for same item type
|
||||
a = _deterministic_call_id("exec", "")
|
||||
b = _deterministic_call_id("exec", "")
|
||||
assert a == b
|
||||
assert "exec" in a
|
||||
|
||||
def test_format_tool_args_sorted_keys(self) -> None:
|
||||
# Sorted keys = deterministic across replays = prefix cache stays valid
|
||||
a = _format_tool_args({"b": 1, "a": 2})
|
||||
b = _format_tool_args({"a": 2, "b": 1})
|
||||
assert a == b
|
||||
|
||||
|
||||
class TestRoleAlternationInvariant:
|
||||
"""The project must never emit two assistant messages back-to-back from
|
||||
one item — that breaks Hermes' message alternation invariant."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"item",
|
||||
[
|
||||
{"type": "commandExecution", "id": "c1", "command": "x",
|
||||
"cwd": "/", "status": "completed", "aggregatedOutput": "",
|
||||
"exitCode": 0, "commandActions": []},
|
||||
{"type": "fileChange", "id": "f1", "status": "applied",
|
||||
"changes": []},
|
||||
{"type": "mcpToolCall", "id": "m1", "server": "s", "tool": "t",
|
||||
"status": "completed", "arguments": {}, "result": None,
|
||||
"error": None},
|
||||
{"type": "dynamicToolCall", "id": "d1", "tool": "x",
|
||||
"arguments": {}, "status": "completed",
|
||||
"contentItems": [], "success": True},
|
||||
],
|
||||
)
|
||||
def test_tool_items_emit_assistant_then_tool(self, item) -> None:
|
||||
msgs = CodexEventProjector().project(
|
||||
{"method": "item/completed", "params": {"item": item}}
|
||||
).messages
|
||||
assert len(msgs) == 2
|
||||
assert msgs[0]["role"] == "assistant"
|
||||
assert msgs[1]["role"] == "tool"
|
||||
assert msgs[1]["tool_call_id"] == msgs[0]["tool_calls"][0]["id"]
|
||||
135
tests/agent/transports/test_hermes_tools_mcp_server.py
Normal file
135
tests/agent/transports/test_hermes_tools_mcp_server.py
Normal file
|
|
@ -0,0 +1,135 @@
|
|||
"""Tests for the hermes-tools-as-MCP server module surface.
|
||||
|
||||
We don't run a live MCP session in unit tests — that requires the codex
|
||||
subprocess + client + an event loop. These tests pin the static
|
||||
contract: the module imports, the EXPOSED_TOOLS list is sane, and the
|
||||
build helper assembles a server when the SDK is present.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestModuleSurface:
|
||||
def test_module_imports_clean(self):
|
||||
from agent.transports import hermes_tools_mcp_server as m
|
||||
assert callable(m.main)
|
||||
assert callable(m._build_server)
|
||||
assert isinstance(m.EXPOSED_TOOLS, tuple)
|
||||
assert len(m.EXPOSED_TOOLS) > 0
|
||||
|
||||
def test_exposed_tools_are_safe_subset(self):
|
||||
"""We MUST NOT expose tools codex already has, because codex'
|
||||
own builtins are better-integrated with its sandbox + approvals.
|
||||
Specifically: no terminal/shell, no read_file/write_file, no
|
||||
patch — those are codex's built-in tools."""
|
||||
from agent.transports.hermes_tools_mcp_server import EXPOSED_TOOLS
|
||||
forbidden = {
|
||||
"terminal", "shell", "read_file", "write_file", "patch",
|
||||
"search_files", "process",
|
||||
}
|
||||
leaked = forbidden & set(EXPOSED_TOOLS)
|
||||
assert not leaked, (
|
||||
f"these tools must NOT be exposed via the codex callback "
|
||||
f"because codex has built-in equivalents: {leaked}"
|
||||
)
|
||||
|
||||
def test_expected_hermes_specific_tools_listed(self):
|
||||
"""The Hermes-specific tools should be present so users on the
|
||||
codex runtime keep access to them."""
|
||||
from agent.transports.hermes_tools_mcp_server import EXPOSED_TOOLS
|
||||
for required in (
|
||||
"web_search",
|
||||
"web_extract",
|
||||
"browser_navigate",
|
||||
"vision_analyze",
|
||||
"image_generate",
|
||||
"skill_view",
|
||||
):
|
||||
assert required in EXPOSED_TOOLS, f"missing {required!r}"
|
||||
|
||||
def test_agent_loop_tools_not_exposed(self):
|
||||
"""delegate_task / memory / session_search / todo require the
|
||||
running AIAgent context to dispatch, so a stateless MCP callback
|
||||
can't drive them. They must NOT be in EXPOSED_TOOLS."""
|
||||
from agent.transports.hermes_tools_mcp_server import EXPOSED_TOOLS
|
||||
for agent_loop_tool in ("delegate_task", "memory", "session_search", "todo"):
|
||||
assert agent_loop_tool not in EXPOSED_TOOLS, (
|
||||
f"{agent_loop_tool!r} requires the agent loop context "
|
||||
"and can't be reached through a stateless MCP callback"
|
||||
)
|
||||
|
||||
def test_kanban_worker_tools_exposed(self):
|
||||
"""Kanban workers run as `hermes chat -q` subprocesses; if they
|
||||
come up on the codex_app_server runtime, the worker can do the
|
||||
actual work via codex's shell but needs the kanban tools through
|
||||
the MCP callback to report back to the kernel. Without these
|
||||
tools available, the worker would hang at completion time."""
|
||||
from agent.transports.hermes_tools_mcp_server import EXPOSED_TOOLS
|
||||
# Worker handoff tools — every dispatched worker uses at least
|
||||
# one of {complete, block, comment} to close out its task.
|
||||
for worker_tool in (
|
||||
"kanban_complete",
|
||||
"kanban_block",
|
||||
"kanban_comment",
|
||||
"kanban_heartbeat",
|
||||
):
|
||||
assert worker_tool in EXPOSED_TOOLS, (
|
||||
f"{worker_tool!r} missing from codex callback — kanban "
|
||||
"workers on codex_app_server runtime would hang"
|
||||
)
|
||||
|
||||
def test_kanban_orchestrator_tools_exposed(self):
|
||||
"""Orchestrator agents need to dispatch new tasks, query the
|
||||
board, and unblock/link tasks. Exposed so an orchestrator on
|
||||
codex_app_server can do its job."""
|
||||
from agent.transports.hermes_tools_mcp_server import EXPOSED_TOOLS
|
||||
for orch_tool in (
|
||||
"kanban_create",
|
||||
"kanban_show",
|
||||
"kanban_list",
|
||||
"kanban_unblock",
|
||||
"kanban_link",
|
||||
):
|
||||
assert orch_tool in EXPOSED_TOOLS, (
|
||||
f"{orch_tool!r} missing from codex callback"
|
||||
)
|
||||
|
||||
|
||||
class TestMain:
|
||||
def test_main_returns_2_when_mcp_unavailable(self, monkeypatch):
|
||||
"""When the mcp package isn't installed, main() should exit
|
||||
cleanly with code 2 and an install hint, not crash."""
|
||||
import agent.transports.hermes_tools_mcp_server as m
|
||||
|
||||
def boom_build(*a, **kw):
|
||||
raise ImportError("mcp not installed")
|
||||
|
||||
monkeypatch.setattr(m, "_build_server", boom_build)
|
||||
rc = m.main(["--verbose"])
|
||||
assert rc == 2
|
||||
|
||||
def test_main_handles_keyboard_interrupt(self, monkeypatch):
|
||||
import agent.transports.hermes_tools_mcp_server as m
|
||||
|
||||
class FakeServer:
|
||||
def run(self):
|
||||
raise KeyboardInterrupt()
|
||||
|
||||
monkeypatch.setattr(m, "_build_server", lambda: FakeServer())
|
||||
rc = m.main([])
|
||||
assert rc == 0
|
||||
|
||||
def test_main_returns_1_on_runtime_error(self, monkeypatch):
|
||||
import agent.transports.hermes_tools_mcp_server as m
|
||||
|
||||
class CrashingServer:
|
||||
def run(self):
|
||||
raise RuntimeError("boom")
|
||||
|
||||
monkeypatch.setattr(m, "_build_server", lambda: CrashingServer())
|
||||
rc = m.main([])
|
||||
assert rc == 1
|
||||
589
tests/hermes_cli/test_codex_runtime_plugin_migration.py
Normal file
589
tests/hermes_cli/test_codex_runtime_plugin_migration.py
Normal file
|
|
@ -0,0 +1,589 @@
|
|||
"""Tests for the codex MCP plugin migration helper."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_cli.codex_runtime_plugin_migration import (
|
||||
MIGRATION_MARKER,
|
||||
MigrationReport,
|
||||
_format_toml_value,
|
||||
_strip_existing_managed_block,
|
||||
_translate_one_server,
|
||||
migrate,
|
||||
render_codex_toml_section,
|
||||
)
|
||||
|
||||
|
||||
# ---- per-server translation ----
|
||||
|
||||
class TestTranslateOneServer:
|
||||
def test_stdio_basic(self):
|
||||
cfg, skipped = _translate_one_server("filesystem", {
|
||||
"command": "npx",
|
||||
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
|
||||
"env": {"FOO": "bar"},
|
||||
})
|
||||
assert cfg == {
|
||||
"command": "npx",
|
||||
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
|
||||
"env": {"FOO": "bar"},
|
||||
}
|
||||
assert skipped == []
|
||||
|
||||
def test_stdio_with_cwd(self):
|
||||
cfg, _ = _translate_one_server("custom", {
|
||||
"command": "/usr/bin/myserver",
|
||||
"cwd": "/var/lib/mcp",
|
||||
})
|
||||
assert cfg["cwd"] == "/var/lib/mcp"
|
||||
|
||||
def test_http_basic(self):
|
||||
cfg, skipped = _translate_one_server("api", {
|
||||
"url": "https://x.example/mcp",
|
||||
"headers": {"Authorization": "Bearer abc"},
|
||||
})
|
||||
assert cfg == {
|
||||
"url": "https://x.example/mcp",
|
||||
"http_headers": {"Authorization": "Bearer abc"},
|
||||
}
|
||||
assert skipped == []
|
||||
|
||||
def test_sse_falls_under_streamable_http_with_warning(self):
|
||||
cfg, skipped = _translate_one_server("sse_server", {
|
||||
"url": "http://localhost:8000/sse",
|
||||
"transport": "sse",
|
||||
})
|
||||
assert cfg["url"] == "http://localhost:8000/sse"
|
||||
assert any("sse" in s.lower() for s in skipped)
|
||||
|
||||
def test_timeouts_translate(self):
|
||||
cfg, _ = _translate_one_server("x", {
|
||||
"command": "y",
|
||||
"timeout": 180,
|
||||
"connect_timeout": 30,
|
||||
})
|
||||
assert cfg["tool_timeout_sec"] == 180.0
|
||||
assert cfg["startup_timeout_sec"] == 30.0
|
||||
|
||||
def test_non_numeric_timeout_skipped(self):
|
||||
cfg, skipped = _translate_one_server("x", {
|
||||
"command": "y",
|
||||
"timeout": "not-a-number",
|
||||
})
|
||||
assert "tool_timeout_sec" not in cfg
|
||||
assert any("timeout" in s and "numeric" in s for s in skipped)
|
||||
|
||||
def test_disabled_server_emits_enabled_false(self):
|
||||
cfg, _ = _translate_one_server("x", {
|
||||
"command": "y",
|
||||
"enabled": False,
|
||||
})
|
||||
assert cfg["enabled"] is False
|
||||
|
||||
def test_enabled_true_omitted(self):
|
||||
cfg, _ = _translate_one_server("x", {"command": "y", "enabled": True})
|
||||
assert "enabled" not in cfg # codex defaults to true
|
||||
|
||||
def test_command_and_url_prefers_stdio_warns(self):
|
||||
cfg, skipped = _translate_one_server("x", {
|
||||
"command": "y", "url": "http://z",
|
||||
})
|
||||
assert "command" in cfg
|
||||
assert "url" not in cfg
|
||||
assert any("url" in s for s in skipped)
|
||||
|
||||
def test_no_transport_returns_none(self):
|
||||
cfg, skipped = _translate_one_server("broken", {"description": "x"})
|
||||
assert cfg is None
|
||||
assert "no command or url" in skipped[0]
|
||||
|
||||
def test_sampling_dropped_with_warning(self):
|
||||
cfg, skipped = _translate_one_server("x", {
|
||||
"command": "y",
|
||||
"sampling": {"enabled": True, "model": "gemini-3-flash"},
|
||||
})
|
||||
assert "sampling" not in cfg
|
||||
assert any("sampling" in s for s in skipped)
|
||||
|
||||
def test_unknown_keys_warned(self):
|
||||
cfg, skipped = _translate_one_server("x", {
|
||||
"command": "y",
|
||||
"totally_made_up_key": "value",
|
||||
})
|
||||
assert "totally_made_up_key" not in cfg
|
||||
assert any("totally_made_up_key" in s for s in skipped)
|
||||
|
||||
def test_non_dict_input(self):
|
||||
cfg, skipped = _translate_one_server("x", "notadict") # type: ignore[arg-type]
|
||||
assert cfg is None
|
||||
|
||||
|
||||
# ---- TOML rendering ----
|
||||
|
||||
class TestTomlValueFormatter:
|
||||
def test_string_quoted(self):
|
||||
assert _format_toml_value("hello") == '"hello"'
|
||||
|
||||
def test_string_with_quotes_escaped(self):
|
||||
assert _format_toml_value('a"b') == '"a\\"b"'
|
||||
|
||||
def test_bool(self):
|
||||
assert _format_toml_value(True) == "true"
|
||||
assert _format_toml_value(False) == "false"
|
||||
|
||||
def test_int(self):
|
||||
assert _format_toml_value(42) == "42"
|
||||
|
||||
def test_float(self):
|
||||
assert _format_toml_value(180.0) == "180.0"
|
||||
|
||||
def test_list_of_strings(self):
|
||||
assert _format_toml_value(["a", "b"]) == '["a", "b"]'
|
||||
|
||||
def test_inline_table(self):
|
||||
out = _format_toml_value({"FOO": "bar"})
|
||||
assert out == '{ FOO = "bar" }'
|
||||
|
||||
def test_empty_inline_table(self):
|
||||
assert _format_toml_value({}) == "{}"
|
||||
|
||||
def test_string_with_newline_escaped(self):
|
||||
"""TOML basic strings don't allow literal newlines — a path or
|
||||
env var containing a newline must use \\n. Otherwise codex would
|
||||
refuse to load the config."""
|
||||
out = _format_toml_value("line one\nline two")
|
||||
assert "\n" not in out # no raw newline in output
|
||||
assert "\\n" in out
|
||||
|
||||
def test_string_with_tab_escaped(self):
|
||||
out = _format_toml_value("col1\tcol2")
|
||||
assert "\t" not in out
|
||||
assert "\\t" in out
|
||||
|
||||
def test_string_with_other_controls_escaped(self):
|
||||
for raw, expected in [
|
||||
("\r", "\\r"),
|
||||
("\f", "\\f"),
|
||||
("\b", "\\b"),
|
||||
]:
|
||||
out = _format_toml_value(f"x{raw}y")
|
||||
assert raw not in out, f"{raw!r} should be escaped"
|
||||
assert expected in out, f"{expected!r} should be in output"
|
||||
|
||||
def test_windows_path_escaped_correctly(self):
|
||||
out = _format_toml_value(r"C:\Users\Alice\.codex")
|
||||
# Each backslash should be doubled
|
||||
assert out == r'"C:\\Users\\Alice\\.codex"'
|
||||
|
||||
def test_atomic_write_no_temp_leak_on_success(self, tmp_path):
|
||||
"""The atomic-write path uses tempfile.mkstemp + rename. On
|
||||
success the temp file should not be left behind."""
|
||||
migrate({"mcp_servers": {"x": {"command": "y"}}},
|
||||
codex_home=tmp_path,
|
||||
discover_plugins=False,
|
||||
expose_hermes_tools=False,
|
||||
default_permission_profile=None)
|
||||
# config.toml should exist
|
||||
assert (tmp_path / "config.toml").exists()
|
||||
# And no .config.toml.* temp files left behind
|
||||
leftover = [p.name for p in tmp_path.iterdir()
|
||||
if p.name.startswith(".config.toml.")]
|
||||
assert leftover == [], f"temp file leaked after migration: {leftover}"
|
||||
|
||||
def test_atomic_write_cleanup_on_rename_failure(self, tmp_path, monkeypatch):
|
||||
"""If rename fails partway through (out of disk, permissions,
|
||||
crash), the temp file must be cleaned up. Otherwise repeated
|
||||
failed migrations would pile up .config.toml.* files."""
|
||||
from pathlib import Path as _Path
|
||||
original_replace = _Path.replace
|
||||
|
||||
def failing_replace(self, target):
|
||||
raise OSError("simulated disk full")
|
||||
|
||||
monkeypatch.setattr(_Path, "replace", failing_replace)
|
||||
report = migrate(
|
||||
{"mcp_servers": {"x": {"command": "y"}}},
|
||||
codex_home=tmp_path,
|
||||
discover_plugins=False,
|
||||
expose_hermes_tools=False,
|
||||
default_permission_profile=None,
|
||||
)
|
||||
# Error surfaced
|
||||
assert any("simulated disk full" in e for e in report.errors)
|
||||
# And no leaked temp file
|
||||
leftover = [p.name for p in tmp_path.iterdir()
|
||||
if p.name.startswith(".config.toml.")]
|
||||
assert leftover == [], f"temp files leaked: {leftover}"
|
||||
|
||||
def test_unsupported_type_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
_format_toml_value(object())
|
||||
|
||||
|
||||
class TestRenderToml:
|
||||
def test_starts_with_marker(self):
|
||||
out = render_codex_toml_section({})
|
||||
assert out.startswith(MIGRATION_MARKER)
|
||||
|
||||
def test_empty_servers_emits_placeholder(self):
|
||||
out = render_codex_toml_section({})
|
||||
assert "no MCP servers" in out
|
||||
|
||||
def test_servers_sorted_alphabetically(self):
|
||||
out = render_codex_toml_section({
|
||||
"zoo": {"command": "z"},
|
||||
"alpha": {"command": "a"},
|
||||
"middle": {"command": "m"},
|
||||
})
|
||||
# Find the section header positions and confirm order
|
||||
a_pos = out.find("[mcp_servers.alpha]")
|
||||
m_pos = out.find("[mcp_servers.middle]")
|
||||
z_pos = out.find("[mcp_servers.zoo]")
|
||||
assert 0 < a_pos < m_pos < z_pos
|
||||
|
||||
def test_server_with_args_and_env(self):
|
||||
out = render_codex_toml_section({
|
||||
"fs": {
|
||||
"command": "npx",
|
||||
"args": ["-y", "filesystem"],
|
||||
"env": {"PATH": "/usr/bin"},
|
||||
}
|
||||
})
|
||||
assert "[mcp_servers.fs]" in out
|
||||
assert 'command = "npx"' in out
|
||||
assert 'args = ["-y", "filesystem"]' in out
|
||||
# Env emitted as inline table
|
||||
assert 'env = { PATH = "/usr/bin" }' in out
|
||||
|
||||
|
||||
# ---- existing-block stripping ----
|
||||
|
||||
class TestStripExistingManagedBlock:
|
||||
def test_no_managed_block_unchanged(self):
|
||||
text = "[other]\nfoo = 1\n"
|
||||
assert _strip_existing_managed_block(text) == text
|
||||
|
||||
def test_strips_managed_block_alone(self):
|
||||
text = (
|
||||
f"{MIGRATION_MARKER}\n"
|
||||
"\n"
|
||||
"[mcp_servers.fs]\n"
|
||||
'command = "npx"\n'
|
||||
)
|
||||
assert _strip_existing_managed_block(text).strip() == ""
|
||||
|
||||
def test_preserves_user_content_above_managed_block(self):
|
||||
text = (
|
||||
"[model]\n"
|
||||
'name = "gpt-5.5"\n'
|
||||
"\n"
|
||||
f"{MIGRATION_MARKER}\n"
|
||||
"[mcp_servers.fs]\n"
|
||||
'command = "x"\n'
|
||||
)
|
||||
out = _strip_existing_managed_block(text)
|
||||
assert "[model]" in out
|
||||
assert 'name = "gpt-5.5"' in out
|
||||
assert "mcp_servers.fs" not in out
|
||||
|
||||
def test_preserves_unrelated_section_after_managed_block(self):
|
||||
text = (
|
||||
f"{MIGRATION_MARKER}\n"
|
||||
"[mcp_servers.fs]\n"
|
||||
'command = "x"\n'
|
||||
"\n"
|
||||
"[providers]\n"
|
||||
'foo = "bar"\n'
|
||||
)
|
||||
out = _strip_existing_managed_block(text)
|
||||
assert "mcp_servers.fs" not in out
|
||||
assert "[providers]" in out
|
||||
assert 'foo = "bar"' in out
|
||||
|
||||
|
||||
# ---- end-to-end migrate(, expose_hermes_tools=False) ----
|
||||
|
||||
class TestMigrate:
|
||||
def test_no_servers_no_plugins_no_perms_writes_placeholder(self, tmp_path):
|
||||
report = migrate({}, codex_home=tmp_path,
|
||||
discover_plugins=False,
|
||||
default_permission_profile=None, expose_hermes_tools=False)
|
||||
assert report.written
|
||||
text = (tmp_path / "config.toml").read_text()
|
||||
assert MIGRATION_MARKER in text
|
||||
assert "no MCP servers" in text or "no MCP servers, plugins, or permissions" in text
|
||||
|
||||
def test_no_servers_still_writes_permissions_default(self, tmp_path):
|
||||
"""Even with zero MCP servers, enabling the runtime should write the
|
||||
default permissions profile so users don't get prompted on every
|
||||
write attempt. This is the fix for quirk #2."""
|
||||
report = migrate({}, codex_home=tmp_path, discover_plugins=False, expose_hermes_tools=False)
|
||||
assert report.written
|
||||
text = (tmp_path / "config.toml").read_text()
|
||||
# Codex's schema: top-level `default_permissions` keying a built-in
|
||||
# profile name (prefixed with ":"). NOT a [permissions] section
|
||||
# (which is for *user-defined* profiles with structured fields).
|
||||
assert 'default_permissions = ":workspace"' in text
|
||||
assert report.wrote_permissions_default == ":workspace"
|
||||
|
||||
def test_explicit_none_permissions_skips_block(self, tmp_path):
|
||||
report = migrate({"mcp_servers": {"x": {"command": "y"}}},
|
||||
codex_home=tmp_path,
|
||||
discover_plugins=False,
|
||||
default_permission_profile=None, expose_hermes_tools=False)
|
||||
text = (tmp_path / "config.toml").read_text()
|
||||
assert "default_permissions" not in text
|
||||
assert "[permissions]" not in text
|
||||
assert report.wrote_permissions_default is None
|
||||
|
||||
def test_plugin_discovery_writes_plugin_blocks(self, tmp_path, monkeypatch):
|
||||
"""Discovered curated plugins land as [plugins."<name>@<marketplace>"]
|
||||
blocks. This is what OpenClaw calls 'migrate native codex plugins.'"""
|
||||
from hermes_cli import codex_runtime_plugin_migration as crpm
|
||||
|
||||
def fake_query(codex_home=None, timeout=8.0):
|
||||
return [
|
||||
{"name": "google-calendar", "marketplace": "openai-curated",
|
||||
"enabled": True},
|
||||
{"name": "github", "marketplace": "openai-curated",
|
||||
"enabled": True},
|
||||
], None
|
||||
monkeypatch.setattr(crpm, "_query_codex_plugins", fake_query)
|
||||
|
||||
report = migrate({}, codex_home=tmp_path, discover_plugins=True, expose_hermes_tools=False)
|
||||
text = (tmp_path / "config.toml").read_text()
|
||||
assert '[plugins."github@openai-curated"]' in text
|
||||
assert '[plugins."google-calendar@openai-curated"]' in text
|
||||
assert "enabled = true" in text
|
||||
assert "google-calendar@openai-curated" in report.migrated_plugins
|
||||
assert "github@openai-curated" in report.migrated_plugins
|
||||
|
||||
def test_plugin_discovery_failure_non_fatal(self, tmp_path, monkeypatch):
|
||||
"""If codex isn't installed or RPC fails, MCP migration still
|
||||
completes. The error surfaces in the report but doesn't abort."""
|
||||
from hermes_cli import codex_runtime_plugin_migration as crpm
|
||||
|
||||
def fake_query_fails(codex_home=None, timeout=8.0):
|
||||
return [], "codex CLI not available"
|
||||
monkeypatch.setattr(crpm, "_query_codex_plugins", fake_query_fails)
|
||||
|
||||
report = migrate({"mcp_servers": {"x": {"command": "y"}}},
|
||||
codex_home=tmp_path, discover_plugins=True, expose_hermes_tools=False)
|
||||
assert report.written
|
||||
assert report.migrated == ["x"]
|
||||
assert report.plugin_query_error == "codex CLI not available"
|
||||
assert report.migrated_plugins == []
|
||||
|
||||
def test_discover_plugins_false_skips_query(self, tmp_path, monkeypatch):
|
||||
"""Tests and restricted environments can opt out of the subprocess
|
||||
spawn entirely."""
|
||||
from hermes_cli import codex_runtime_plugin_migration as crpm
|
||||
|
||||
called = {"yes": False}
|
||||
def boom(*a, **kw):
|
||||
called["yes"] = True
|
||||
return [], None
|
||||
monkeypatch.setattr(crpm, "_query_codex_plugins", boom)
|
||||
|
||||
migrate({"mcp_servers": {"x": {"command": "y"}}},
|
||||
codex_home=tmp_path, discover_plugins=False, expose_hermes_tools=False)
|
||||
assert called["yes"] is False
|
||||
|
||||
def test_dry_run_skips_plugin_query(self, tmp_path, monkeypatch):
|
||||
"""Dry run should never spawn codex. Even with discover_plugins=True
|
||||
the query is skipped because dry_run takes precedence."""
|
||||
from hermes_cli import codex_runtime_plugin_migration as crpm
|
||||
|
||||
called = {"yes": False}
|
||||
def boom(*a, **kw):
|
||||
called["yes"] = True
|
||||
return [], None
|
||||
monkeypatch.setattr(crpm, "_query_codex_plugins", boom)
|
||||
|
||||
migrate({"mcp_servers": {"x": {"command": "y"}}},
|
||||
codex_home=tmp_path, dry_run=True, discover_plugins=True, expose_hermes_tools=False)
|
||||
assert called["yes"] is False
|
||||
|
||||
def test_re_run_replaces_plugin_block(self, tmp_path, monkeypatch):
|
||||
"""Plugin blocks are managed and re-runs should replace them
|
||||
cleanly — same idempotency contract as MCP servers."""
|
||||
from hermes_cli import codex_runtime_plugin_migration as crpm
|
||||
|
||||
# First run: only github
|
||||
monkeypatch.setattr(crpm, "_query_codex_plugins",
|
||||
lambda codex_home=None, timeout=8.0: (
|
||||
[{"name": "github", "marketplace": "openai-curated", "enabled": True}],
|
||||
None,
|
||||
))
|
||||
migrate({}, codex_home=tmp_path, discover_plugins=True,
|
||||
default_permission_profile=None, expose_hermes_tools=False)
|
||||
first = (tmp_path / "config.toml").read_text()
|
||||
assert "github@openai-curated" in first
|
||||
|
||||
# Second run: only canva (github went away)
|
||||
monkeypatch.setattr(crpm, "_query_codex_plugins",
|
||||
lambda codex_home=None, timeout=8.0: (
|
||||
[{"name": "canva", "marketplace": "openai-curated", "enabled": True}],
|
||||
None,
|
||||
))
|
||||
migrate({}, codex_home=tmp_path, discover_plugins=True,
|
||||
default_permission_profile=None, expose_hermes_tools=False)
|
||||
second = (tmp_path / "config.toml").read_text()
|
||||
assert "github@openai-curated" not in second
|
||||
assert "canva@openai-curated" in second
|
||||
|
||||
def test_expose_hermes_tools_writes_callback_mcp_entry(self, tmp_path):
|
||||
"""When expose_hermes_tools=True (production default), an
|
||||
[mcp_servers.hermes-tools] entry is written so codex calls back
|
||||
into Hermes for browser/web/delegate_task/vision/memory tools.
|
||||
|
||||
This is the fix for 'all other tools that codex doesn't provide
|
||||
should be useable by hermes' — quirk #7."""
|
||||
report = migrate({}, codex_home=tmp_path,
|
||||
discover_plugins=False,
|
||||
default_permission_profile=None,
|
||||
expose_hermes_tools=True)
|
||||
text = (tmp_path / "config.toml").read_text()
|
||||
assert "[mcp_servers.hermes-tools]" in text
|
||||
assert "hermes_tools_mcp_server" in text
|
||||
# Must include startup + tool timeouts so codex doesn't give up
|
||||
assert "startup_timeout_sec" in text
|
||||
assert "tool_timeout_sec" in text
|
||||
# And the entry is reported
|
||||
assert "hermes-tools" in report.migrated
|
||||
|
||||
def test_expose_hermes_tools_disabled_skips_entry(self, tmp_path):
|
||||
"""expose_hermes_tools=False suppresses the callback registration."""
|
||||
migrate({}, codex_home=tmp_path,
|
||||
discover_plugins=False,
|
||||
default_permission_profile=None,
|
||||
expose_hermes_tools=False)
|
||||
text = (tmp_path / "config.toml").read_text()
|
||||
assert "[mcp_servers.hermes-tools]" not in text
|
||||
assert "hermes_tools_mcp_server" not in text
|
||||
|
||||
def test_dry_run_doesnt_write(self, tmp_path):
|
||||
report = migrate({"mcp_servers": {"x": {"command": "y"}}},
|
||||
codex_home=tmp_path, dry_run=True, expose_hermes_tools=False)
|
||||
assert report.dry_run is True
|
||||
assert not (tmp_path / "config.toml").exists()
|
||||
assert "x" in report.migrated
|
||||
|
||||
def test_full_migration_round_trip(self, tmp_path):
|
||||
hermes_cfg = {
|
||||
"mcp_servers": {
|
||||
"filesystem": {
|
||||
"command": "npx",
|
||||
"args": ["-y", "@modelcontextprotocol/server-filesystem"],
|
||||
},
|
||||
"github": {
|
||||
"url": "https://api.github.com/mcp",
|
||||
"headers": {"Authorization": "Bearer x"},
|
||||
},
|
||||
}
|
||||
}
|
||||
report = migrate(hermes_cfg, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
assert report.written
|
||||
text = (tmp_path / "config.toml").read_text()
|
||||
assert "[mcp_servers.filesystem]" in text
|
||||
assert "[mcp_servers.github]" in text
|
||||
assert 'command = "npx"' in text
|
||||
assert 'url = "https://api.github.com/mcp"' in text
|
||||
|
||||
def test_idempotent_re_run_replaces_managed_block(self, tmp_path):
|
||||
# First migration
|
||||
migrate({"mcp_servers": {"a": {"command": "x"}}}, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
first_text = (tmp_path / "config.toml").read_text()
|
||||
assert "[mcp_servers.a]" in first_text
|
||||
# Second migration with different servers
|
||||
migrate({"mcp_servers": {"b": {"command": "y"}}}, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
second_text = (tmp_path / "config.toml").read_text()
|
||||
assert "[mcp_servers.a]" not in second_text
|
||||
assert "[mcp_servers.b]" in second_text
|
||||
|
||||
def test_preserves_user_codex_config_above_marker(self, tmp_path):
|
||||
target = tmp_path / "config.toml"
|
||||
target.write_text(
|
||||
"[model]\n"
|
||||
'profile = "default"\n'
|
||||
"\n"
|
||||
"[providers.openai]\n"
|
||||
'api_key = "sk-test"\n'
|
||||
)
|
||||
migrate({"mcp_servers": {"a": {"command": "x"}}}, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
new_text = target.read_text()
|
||||
# User's codex config preserved
|
||||
assert "[model]" in new_text
|
||||
assert 'profile = "default"' in new_text
|
||||
assert "[providers.openai]" in new_text
|
||||
# And new MCP block appended
|
||||
assert "[mcp_servers.a]" in new_text
|
||||
assert MIGRATION_MARKER in new_text
|
||||
|
||||
def test_preserves_user_mcp_server_outside_managed_block(self, tmp_path):
|
||||
"""Quirk #6: when a user adds their own MCP server entry directly
|
||||
to ~/.codex/config.toml outside Hermes' managed block, re-running
|
||||
migration must preserve it. Tested both above and below the
|
||||
managed block."""
|
||||
target = tmp_path / "config.toml"
|
||||
target.write_text(
|
||||
"[mcp_servers.user-above]\n"
|
||||
'command = "/usr/bin/above-server"\n'
|
||||
'args = ["--above"]\n'
|
||||
)
|
||||
# First migrate — adds managed block below user content
|
||||
migrate({"mcp_servers": {"hermes-mcp": {"command": "npx"}}},
|
||||
codex_home=tmp_path, discover_plugins=False,
|
||||
expose_hermes_tools=False)
|
||||
text = target.read_text()
|
||||
assert "user-above" in text, "user MCP server above managed block got nuked"
|
||||
assert 'command = "/usr/bin/above-server"' in text
|
||||
|
||||
# Append another user entry below the managed block
|
||||
target.write_text(
|
||||
text + "\n[mcp_servers.user-below]\ncommand = \"below-server\"\n"
|
||||
)
|
||||
# Re-migrate — both should survive
|
||||
migrate({"mcp_servers": {"hermes-mcp": {"command": "npx"}}},
|
||||
codex_home=tmp_path, discover_plugins=False,
|
||||
expose_hermes_tools=False)
|
||||
final = target.read_text()
|
||||
assert "user-above" in final
|
||||
assert "user-below" in final
|
||||
# And our managed block is still there with the new content
|
||||
assert "[mcp_servers.hermes-mcp]" in final
|
||||
|
||||
def test_skipped_keys_reported(self, tmp_path):
|
||||
report = migrate({
|
||||
"mcp_servers": {
|
||||
"x": {
|
||||
"command": "y",
|
||||
"sampling": {"enabled": True}, # codex has no equivalent
|
||||
}
|
||||
}
|
||||
}, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
assert "x" in report.skipped_keys_per_server
|
||||
assert any("sampling" in s for s in report.skipped_keys_per_server["x"])
|
||||
|
||||
def test_invalid_mcp_servers_value(self, tmp_path):
|
||||
report = migrate({"mcp_servers": "notadict"}, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
assert any("not a dict" in e for e in report.errors)
|
||||
|
||||
def test_server_without_transport_skipped_with_error(self, tmp_path):
|
||||
report = migrate({
|
||||
"mcp_servers": {"broken": {"description": "no command/url"}}
|
||||
}, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
assert "broken" not in report.migrated
|
||||
assert any("broken" in e for e in report.errors)
|
||||
|
||||
def test_summary_reports_migration_count(self, tmp_path):
|
||||
report = migrate({
|
||||
"mcp_servers": {"a": {"command": "x"}, "b": {"command": "y"}}
|
||||
}, codex_home=tmp_path, expose_hermes_tools=False)
|
||||
summary = report.summary()
|
||||
assert "Migrated 2 MCP server(s)" in summary
|
||||
assert "- a" in summary
|
||||
assert "- b" in summary
|
||||
231
tests/hermes_cli/test_codex_runtime_switch.py
Normal file
231
tests/hermes_cli/test_codex_runtime_switch.py
Normal file
|
|
@ -0,0 +1,231 @@
|
|||
"""Tests for the /codex-runtime slash-command shared logic.
|
||||
|
||||
These cover the pure-Python state machine; CLI and gateway handlers are
|
||||
tested separately because they involve config persistence and prompt
|
||||
formatting that's surface-specific."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_cli import codex_runtime_switch as crs
|
||||
|
||||
|
||||
class TestParseArgs:
|
||||
@pytest.mark.parametrize("arg,expected", [
|
||||
("", None),
|
||||
(" ", None),
|
||||
("auto", "auto"),
|
||||
("codex_app_server", "codex_app_server"),
|
||||
("on", "codex_app_server"),
|
||||
("off", "auto"),
|
||||
("codex", "codex_app_server"),
|
||||
("default", "auto"),
|
||||
("hermes", "auto"),
|
||||
("ENABLE", "codex_app_server"), # case-insensitive
|
||||
("DiSaBlE", "auto"),
|
||||
])
|
||||
def test_valid_args(self, arg, expected):
|
||||
value, errors = crs.parse_args(arg)
|
||||
assert errors == []
|
||||
assert value == expected
|
||||
|
||||
def test_invalid_arg_returns_error(self):
|
||||
value, errors = crs.parse_args("turbo")
|
||||
assert value is None
|
||||
assert errors and "Unknown runtime" in errors[0]
|
||||
|
||||
|
||||
class TestGetCurrentRuntime:
|
||||
def test_default_when_unset(self):
|
||||
assert crs.get_current_runtime({}) == "auto"
|
||||
assert crs.get_current_runtime({"model": {}}) == "auto"
|
||||
assert crs.get_current_runtime({"model": {"openai_runtime": ""}}) == "auto"
|
||||
|
||||
def test_unrecognized_falls_back_to_auto(self):
|
||||
assert crs.get_current_runtime(
|
||||
{"model": {"openai_runtime": "garbage"}}
|
||||
) == "auto"
|
||||
|
||||
def test_explicit_codex(self):
|
||||
assert crs.get_current_runtime(
|
||||
{"model": {"openai_runtime": "codex_app_server"}}
|
||||
) == "codex_app_server"
|
||||
|
||||
def test_handles_non_dict_config(self):
|
||||
assert crs.get_current_runtime(None) == "auto" # type: ignore[arg-type]
|
||||
assert crs.get_current_runtime("notadict") == "auto" # type: ignore[arg-type]
|
||||
assert crs.get_current_runtime({"model": "notadict"}) == "auto"
|
||||
|
||||
|
||||
class TestSetRuntime:
|
||||
def test_creates_model_section_if_missing(self):
|
||||
cfg = {}
|
||||
old = crs.set_runtime(cfg, "codex_app_server")
|
||||
assert old == "auto"
|
||||
assert cfg["model"]["openai_runtime"] == "codex_app_server"
|
||||
|
||||
def test_returns_previous_value(self):
|
||||
cfg = {"model": {"openai_runtime": "codex_app_server"}}
|
||||
old = crs.set_runtime(cfg, "auto")
|
||||
assert old == "codex_app_server"
|
||||
assert cfg["model"]["openai_runtime"] == "auto"
|
||||
|
||||
def test_invalid_value_raises(self):
|
||||
with pytest.raises(ValueError):
|
||||
crs.set_runtime({}, "garbage")
|
||||
|
||||
|
||||
class TestApply:
|
||||
def test_read_only_call_reports_state(self):
|
||||
cfg = {"model": {"openai_runtime": "codex_app_server"}}
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(True, "0.130.0")):
|
||||
r = crs.apply(cfg, None)
|
||||
assert r.success
|
||||
assert r.new_value == "codex_app_server"
|
||||
assert r.old_value == "codex_app_server"
|
||||
assert "codex_app_server" in r.message
|
||||
assert "0.130.0" in r.message
|
||||
|
||||
def test_no_change_when_already_set(self):
|
||||
cfg = {"model": {"openai_runtime": "auto"}}
|
||||
r = crs.apply(cfg, "auto")
|
||||
assert r.success
|
||||
assert r.message == "openai_runtime already set to auto"
|
||||
|
||||
def test_enable_blocked_when_codex_missing(self):
|
||||
cfg = {}
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(False, "codex not found")):
|
||||
r = crs.apply(cfg, "codex_app_server")
|
||||
assert r.success is False
|
||||
assert "Cannot enable" in r.message
|
||||
assert "npm i -g @openai/codex" in r.message
|
||||
# Config NOT mutated on failure
|
||||
assert cfg.get("model", {}).get("openai_runtime") in (None, "")
|
||||
|
||||
def test_enable_succeeds_when_codex_present(self):
|
||||
cfg = {}
|
||||
persisted = {}
|
||||
|
||||
def persist(c):
|
||||
persisted.update(c)
|
||||
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(True, "0.130.0")):
|
||||
r = crs.apply(cfg, "codex_app_server", persist_callback=persist)
|
||||
assert r.success
|
||||
assert r.new_value == "codex_app_server"
|
||||
assert r.old_value == "auto"
|
||||
assert r.requires_new_session is True
|
||||
assert "via MCP" in r.message # hermes-tools callback message
|
||||
assert cfg["model"]["openai_runtime"] == "codex_app_server"
|
||||
assert persisted["model"]["openai_runtime"] == "codex_app_server"
|
||||
|
||||
def test_disable_does_not_check_binary(self):
|
||||
cfg = {"model": {"openai_runtime": "codex_app_server"}}
|
||||
with patch.object(crs, "check_codex_binary_ok") as bin_check:
|
||||
r = crs.apply(cfg, "auto")
|
||||
assert r.success
|
||||
# Binary check is irrelevant when disabling — should not be called
|
||||
# with the codex_app_server enable-gate signature.
|
||||
assert r.new_value == "auto"
|
||||
assert r.old_value == "codex_app_server"
|
||||
|
||||
def test_persist_callback_failure_reported(self):
|
||||
cfg = {}
|
||||
|
||||
def persist_boom(c):
|
||||
raise IOError("disk full")
|
||||
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(True, "0.130.0")):
|
||||
r = crs.apply(cfg, "codex_app_server", persist_callback=persist_boom)
|
||||
assert r.success is False
|
||||
assert "persist failed" in r.message
|
||||
assert "disk full" in r.message
|
||||
|
||||
def test_enable_triggers_mcp_migration(self):
|
||||
"""Enabling codex_app_server should auto-migrate Hermes mcp_servers
|
||||
to ~/.codex/config.toml so the spawned subprocess sees them."""
|
||||
cfg = {
|
||||
"mcp_servers": {
|
||||
"filesystem": {"command": "npx", "args": ["-y", "fs-server"]},
|
||||
}
|
||||
}
|
||||
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(True, "0.130.0")), \
|
||||
patch("hermes_cli.codex_runtime_plugin_migration.migrate") as mig:
|
||||
mig.return_value.migrated = ["filesystem", "hermes-tools"]
|
||||
mig.return_value.migrated_plugins = []
|
||||
mig.return_value.plugin_query_error = None
|
||||
mig.return_value.wrote_permissions_default = ":workspace"
|
||||
mig.return_value.errors = []
|
||||
mig.return_value.target_path = "/fake/.codex/config.toml"
|
||||
r = crs.apply(cfg, "codex_app_server")
|
||||
assert r.success
|
||||
assert mig.called # migration was triggered
|
||||
# User MCP servers are reported (excluding internal hermes-tools)
|
||||
assert "Migrated 1 MCP server" in r.message
|
||||
assert "filesystem" in r.message
|
||||
# Permissions default surfaces
|
||||
assert "Default sandbox: :workspace" in r.message
|
||||
# Hermes tool callback announcement
|
||||
assert "via MCP" in r.message
|
||||
|
||||
def test_disable_does_not_trigger_migration(self):
|
||||
"""Switching back to auto must not write to ~/.codex/."""
|
||||
cfg = {
|
||||
"model": {"openai_runtime": "codex_app_server"},
|
||||
"mcp_servers": {"x": {"command": "y"}},
|
||||
}
|
||||
with patch("hermes_cli.codex_runtime_plugin_migration.migrate") as mig:
|
||||
r = crs.apply(cfg, "auto")
|
||||
assert r.success
|
||||
assert not mig.called # disabling does not migrate
|
||||
|
||||
def test_migration_failure_does_not_block_enable(self):
|
||||
"""If MCP migration raises, the runtime change still proceeds —
|
||||
users can manually re-run migration later."""
|
||||
cfg = {"mcp_servers": {"x": {"command": "y"}}}
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(True, "0.130.0")), \
|
||||
patch("hermes_cli.codex_runtime_plugin_migration.migrate",
|
||||
side_effect=RuntimeError("disk full")):
|
||||
r = crs.apply(cfg, "codex_app_server")
|
||||
assert r.success # change still applied
|
||||
assert r.new_value == "codex_app_server"
|
||||
assert "MCP migration skipped" in r.message
|
||||
assert "disk full" in r.message
|
||||
|
||||
def test_binary_check_cached_within_apply(self):
|
||||
"""check_codex_binary_ok is invoked at most once per apply() call.
|
||||
|
||||
The enable path has three sites that need the version (state report,
|
||||
enable gate, success message). Without caching, a single
|
||||
/codex-runtime invocation spawns `codex --version` three times.
|
||||
Regression guard against a refactor that drops the cache.
|
||||
"""
|
||||
cfg = {}
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(True, "0.130.0")) as bin_check, \
|
||||
patch("hermes_cli.codex_runtime_plugin_migration.migrate"):
|
||||
r = crs.apply(cfg, "codex_app_server")
|
||||
assert r.success
|
||||
assert bin_check.call_count == 1, (
|
||||
f"check_codex_binary_ok was called {bin_check.call_count} time(s); "
|
||||
"should be cached and called exactly once per apply()"
|
||||
)
|
||||
|
||||
def test_binary_check_cached_on_read_only_call(self):
|
||||
"""Read-only call (new_value=None) calls the binary check exactly
|
||||
once and reuses the result for the message."""
|
||||
cfg = {"model": {"openai_runtime": "codex_app_server"}}
|
||||
with patch.object(crs, "check_codex_binary_ok",
|
||||
return_value=(True, "0.130.0")) as bin_check:
|
||||
crs.apply(cfg, None)
|
||||
assert bin_check.call_count == 1
|
||||
344
tests/run_agent/test_codex_app_server_integration.py
Normal file
344
tests/run_agent/test_codex_app_server_integration.py
Normal file
|
|
@ -0,0 +1,344 @@
|
|||
"""Integration test for the codex_app_server runtime path through AIAgent.
|
||||
|
||||
Verifies that:
|
||||
- api_mode='codex_app_server' is accepted on AIAgent construction
|
||||
- run_conversation() takes the early-return path and never enters the
|
||||
chat completions loop
|
||||
- Projected messages from a fake Codex session land in the messages list
|
||||
- tool_iterations from the codex session tick the skill nudge counter
|
||||
- Memory nudge counter ticks once per turn
|
||||
- The returned dict has the same shape as the chat_completions path
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
import run_agent
|
||||
from agent.transports.codex_app_server_session import CodexAppServerSession, TurnResult
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_session(monkeypatch):
|
||||
"""Replace CodexAppServerSession with a stub that returns a fixed
|
||||
TurnResult, so we can drive AIAgent without spawning real codex."""
|
||||
|
||||
def fake_run_turn(self, user_input: str, **kwargs):
|
||||
return TurnResult(
|
||||
final_text=f"echo: {user_input}",
|
||||
projected_messages=[
|
||||
{"role": "assistant", "content": None,
|
||||
"tool_calls": [{"id": "exec_1", "type": "function",
|
||||
"function": {"name": "exec_command",
|
||||
"arguments": "{}"}}]},
|
||||
{"role": "tool", "tool_call_id": "exec_1", "content": "ok"},
|
||||
{"role": "assistant", "content": f"echo: {user_input}"},
|
||||
],
|
||||
tool_iterations=1,
|
||||
interrupted=False,
|
||||
error=None,
|
||||
turn_id="turn-stub-1",
|
||||
thread_id="thread-stub-1",
|
||||
)
|
||||
|
||||
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
|
||||
monkeypatch.setattr(
|
||||
CodexAppServerSession, "ensure_started", lambda self: "thread-stub-1"
|
||||
)
|
||||
|
||||
|
||||
def _make_codex_agent():
|
||||
"""Construct an AIAgent in codex_app_server mode without contacting any
|
||||
real provider. We pass api_mode explicitly so the constructor takes the
|
||||
fast path for direct credentials."""
|
||||
return run_agent.AIAgent(
|
||||
api_key="stub",
|
||||
base_url="https://stub.invalid",
|
||||
provider="openai",
|
||||
api_mode="codex_app_server",
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
|
||||
|
||||
class TestApiModeAccepted:
|
||||
def test_api_mode_is_codex_app_server(self):
|
||||
agent = _make_codex_agent()
|
||||
assert agent.api_mode == "codex_app_server"
|
||||
|
||||
|
||||
class TestRunConversationCodexPath:
|
||||
def test_run_conversation_returns_codex_shape(self, fake_session):
|
||||
agent = _make_codex_agent()
|
||||
# No background review fork during tests
|
||||
with patch.object(agent, "_spawn_background_review", return_value=None):
|
||||
result = agent.run_conversation("hello there")
|
||||
assert result["final_response"] == "echo: hello there"
|
||||
assert result["completed"] is True
|
||||
assert result["partial"] is False
|
||||
assert result["error"] is None
|
||||
assert result["api_calls"] == 1
|
||||
assert result["codex_thread_id"] == "thread-stub-1"
|
||||
assert result["codex_turn_id"] == "turn-stub-1"
|
||||
|
||||
def test_projected_messages_are_spliced(self, fake_session):
|
||||
agent = _make_codex_agent()
|
||||
with patch.object(agent, "_spawn_background_review", return_value=None):
|
||||
result = agent.run_conversation("hello")
|
||||
msgs = result["messages"]
|
||||
# User message + 3 projected (assistant tool_call + tool + assistant text)
|
||||
assert len(msgs) >= 4
|
||||
assert msgs[0]["role"] == "user"
|
||||
assert msgs[0]["content"] == "hello"
|
||||
# Last assistant message has the final text
|
||||
final = [m for m in msgs if m.get("role") == "assistant"
|
||||
and m.get("content") == "echo: hello"]
|
||||
assert final, f"expected final assistant message in {msgs}"
|
||||
|
||||
def test_nudge_counters_tick(self, fake_session):
|
||||
"""The skill nudge counter must accumulate tool_iterations across
|
||||
turns. The memory nudge counter is gated on memory being configured
|
||||
(which we skip via skip_memory=True), so we don't assert on it here —
|
||||
a separate test below covers that path explicitly."""
|
||||
agent = _make_codex_agent()
|
||||
agent._iters_since_skill = 0
|
||||
agent._user_turn_count = 0
|
||||
with patch.object(agent, "_spawn_background_review", return_value=None):
|
||||
agent.run_conversation("first")
|
||||
assert agent._iters_since_skill == 1 # one tool_iteration in fake turn
|
||||
# _user_turn_count is incremented by run_conversation pre-loop, not
|
||||
# by the codex helper — confirms we delegate that to the standard flow.
|
||||
assert agent._user_turn_count == 1
|
||||
with patch.object(agent, "_spawn_background_review", return_value=None):
|
||||
agent.run_conversation("second")
|
||||
assert agent._iters_since_skill == 2
|
||||
assert agent._user_turn_count == 2
|
||||
|
||||
def test_user_message_not_duplicated(self, fake_session):
|
||||
"""Regression guard: the user message must appear exactly once in
|
||||
the messages list. The standard run_conversation pre-loop appends
|
||||
it, and the codex helper must NOT append again."""
|
||||
agent = _make_codex_agent()
|
||||
with patch.object(agent, "_spawn_background_review", return_value=None):
|
||||
result = agent.run_conversation("ping unique 12345")
|
||||
user_count = sum(
|
||||
1 for m in result["messages"]
|
||||
if m.get("role") == "user" and m.get("content") == "ping unique 12345"
|
||||
)
|
||||
assert user_count == 1, f"user message appeared {user_count}× in {result['messages']}"
|
||||
|
||||
def test_background_review_NOT_invoked_below_threshold(self, fake_session):
|
||||
"""A single turn shouldn't trigger background review — counters
|
||||
haven't reached the nudge interval (default 10)."""
|
||||
agent = _make_codex_agent()
|
||||
agent._memory_nudge_interval = 10
|
||||
agent._skill_nudge_interval = 10
|
||||
agent._iters_since_skill = 0
|
||||
with patch.object(agent, "_spawn_background_review",
|
||||
return_value=None) as spawn:
|
||||
agent.run_conversation("ping")
|
||||
# Below threshold → review should NOT fire (was a real bug:
|
||||
# the helper was calling _spawn_background_review() with no
|
||||
# args after every turn, which would crash with TypeError).
|
||||
assert not spawn.called
|
||||
|
||||
def test_background_review_skill_trigger_fires_above_threshold(
|
||||
self, monkeypatch
|
||||
):
|
||||
"""When tool iterations cross the skill nudge interval, the
|
||||
background review fires with review_skills=True and the right
|
||||
messages_snapshot signature."""
|
||||
from agent.transports.codex_app_server_session import (
|
||||
CodexAppServerSession, TurnResult,
|
||||
)
|
||||
# Make the fake session report 10 tool iterations in one turn
|
||||
# (matching the default skill threshold).
|
||||
def fake_run_turn(self, user_input: str, **kwargs):
|
||||
return TurnResult(
|
||||
final_text=f"echo: {user_input}",
|
||||
projected_messages=[
|
||||
{"role": "assistant", "content": f"echo: {user_input}"},
|
||||
],
|
||||
tool_iterations=10,
|
||||
turn_id="t1", thread_id="th1",
|
||||
)
|
||||
monkeypatch.setattr(CodexAppServerSession, "run_turn", fake_run_turn)
|
||||
monkeypatch.setattr(
|
||||
CodexAppServerSession, "ensure_started", lambda self: "th1"
|
||||
)
|
||||
|
||||
agent = _make_codex_agent()
|
||||
agent._skill_nudge_interval = 10
|
||||
agent._iters_since_skill = 0
|
||||
# Make valid_tool_names include 'skill_manage' so the gate passes
|
||||
agent.valid_tool_names = set(getattr(agent, "valid_tool_names", set()))
|
||||
agent.valid_tool_names.add("skill_manage")
|
||||
|
||||
with patch.object(agent, "_spawn_background_review",
|
||||
return_value=None) as spawn:
|
||||
agent.run_conversation("do tool work")
|
||||
|
||||
assert spawn.called, "skill threshold tripped but review didn't fire"
|
||||
# Verify the call signature matches what _spawn_background_review
|
||||
# actually expects — this is the regression guard for the original
|
||||
# bug where the codex path called it with no args at all.
|
||||
call = spawn.call_args
|
||||
assert "messages_snapshot" in call.kwargs
|
||||
assert isinstance(call.kwargs["messages_snapshot"], list)
|
||||
assert call.kwargs["review_skills"] is True
|
||||
# Counter should be reset after the review fires
|
||||
assert agent._iters_since_skill == 0
|
||||
|
||||
def test_background_review_signature_never_breaks(self, fake_session):
|
||||
"""Even when no trigger fires, the helper must never call
|
||||
_spawn_background_review with the wrong signature. Run a turn,
|
||||
then run another turn after manually tripping the skill counter
|
||||
and confirm the call shape is the kwargs-only form the function
|
||||
actually accepts."""
|
||||
agent = _make_codex_agent()
|
||||
agent._skill_nudge_interval = 1 # very low so any iter trips it
|
||||
agent._iters_since_skill = 0
|
||||
agent.valid_tool_names = set(getattr(agent, "valid_tool_names", set()))
|
||||
agent.valid_tool_names.add("skill_manage")
|
||||
|
||||
with patch.object(agent, "_spawn_background_review",
|
||||
return_value=None) as spawn:
|
||||
agent.run_conversation("first")
|
||||
# The fake session reports tool_iterations=1, which trips
|
||||
# _skill_nudge_interval=1. So review should fire.
|
||||
assert spawn.called
|
||||
# Critical invariant: positional args must be empty, all real
|
||||
# args must be kwargs (matching _spawn_background_review's
|
||||
# actual signature).
|
||||
call = spawn.call_args
|
||||
assert call.args == (), (
|
||||
f"expected no positional args, got {call.args!r} — "
|
||||
"would crash _spawn_background_review at runtime"
|
||||
)
|
||||
assert "messages_snapshot" in call.kwargs
|
||||
|
||||
def test_chat_completions_loop_is_not_entered(self, fake_session):
|
||||
"""The early-return must bypass the regular API call loop entirely.
|
||||
We confirm by patching the SDK call and asserting it's never invoked."""
|
||||
agent = _make_codex_agent()
|
||||
# The chat_completions loop calls self.client.chat.completions.create(...)
|
||||
# If our early-return works, that path is dead.
|
||||
with patch.object(agent, "client") as client_mock, patch.object(
|
||||
agent, "_spawn_background_review", return_value=None
|
||||
):
|
||||
agent.run_conversation("hi")
|
||||
assert not client_mock.chat.completions.create.called
|
||||
|
||||
|
||||
class TestReviewForkApiModeDowngrade:
|
||||
"""When the parent agent runs on codex_app_server, the background
|
||||
review fork must downgrade to codex_responses — otherwise the fork
|
||||
can't dispatch agent-loop tools (memory, skill_manage) which is the
|
||||
whole point of the review."""
|
||||
|
||||
def test_codex_app_server_parent_downgrades_review_fork(self):
|
||||
"""Live test against the real _spawn_background_review code path:
|
||||
verify the review_agent gets api_mode=codex_responses when the
|
||||
parent is codex_app_server."""
|
||||
from unittest.mock import MagicMock, patch as _patch
|
||||
agent = _make_codex_agent()
|
||||
# Pretend memory + skills are configured so the review fork
|
||||
# reaches the AIAgent constructor.
|
||||
agent._memory_store = MagicMock()
|
||||
agent._memory_enabled = True
|
||||
agent._user_profile_enabled = True
|
||||
# Mock _current_main_runtime to return the parent's codex_app_server
|
||||
# state so we can confirm the helper detects + downgrades it.
|
||||
agent._current_main_runtime = lambda: {
|
||||
"api_mode": "codex_app_server",
|
||||
"base_url": "https://chatgpt.com/backend-api/codex",
|
||||
"api_key": "stub-token",
|
||||
}
|
||||
# Capture what AIAgent gets constructed with inside the helper.
|
||||
captured = {}
|
||||
|
||||
def _capture_init(self, **kwargs):
|
||||
captured.update(kwargs)
|
||||
# Set bare attributes the rest of the spawn function reads
|
||||
# so it can finish without exploding.
|
||||
self.api_mode = kwargs.get("api_mode")
|
||||
self.provider = kwargs.get("provider")
|
||||
self.model = kwargs.get("model")
|
||||
self._memory_write_origin = None
|
||||
self._memory_write_context = None
|
||||
self._memory_store = None
|
||||
self._memory_enabled = False
|
||||
self._user_profile_enabled = False
|
||||
self._memory_nudge_interval = 0
|
||||
self._skill_nudge_interval = 0
|
||||
self.suppress_status_output = False
|
||||
self._session_messages = []
|
||||
|
||||
def _no_op_run_conv(*a, **kw):
|
||||
return {"final_response": "", "messages": []}
|
||||
self.run_conversation = _no_op_run_conv
|
||||
|
||||
def _no_op_close(*a, **kw):
|
||||
return None
|
||||
self.close = _no_op_close
|
||||
|
||||
with _patch("run_agent.AIAgent.__init__", _capture_init):
|
||||
agent._spawn_background_review(
|
||||
messages_snapshot=[{"role": "user", "content": "x"}],
|
||||
review_memory=True,
|
||||
review_skills=False,
|
||||
)
|
||||
# Wait for the spawned thread to actually execute
|
||||
import time
|
||||
for _ in range(30):
|
||||
if "api_mode" in captured:
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
assert captured.get("api_mode") == "codex_responses", (
|
||||
f"review fork should be downgraded to codex_responses when "
|
||||
f"parent is codex_app_server; got {captured.get('api_mode')!r}"
|
||||
)
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
def test_session_exception_returns_partial_with_error(self, monkeypatch):
|
||||
def boom_run_turn(self, user_input, **kwargs):
|
||||
raise RuntimeError("subprocess died")
|
||||
|
||||
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
|
||||
lambda self: "t1")
|
||||
monkeypatch.setattr(CodexAppServerSession, "run_turn", boom_run_turn)
|
||||
|
||||
agent = _make_codex_agent()
|
||||
with patch.object(agent, "_spawn_background_review", return_value=None):
|
||||
result = agent.run_conversation("hi")
|
||||
assert result["completed"] is False
|
||||
assert result["partial"] is True
|
||||
assert "subprocess died" in result["error"]
|
||||
assert "codex-runtime auto" in result["final_response"]
|
||||
|
||||
def test_interrupted_turn_marked_partial(self, monkeypatch):
|
||||
def interrupted_turn(self, user_input, **kwargs):
|
||||
return TurnResult(
|
||||
final_text="",
|
||||
projected_messages=[],
|
||||
tool_iterations=0,
|
||||
interrupted=True,
|
||||
error="user interrupted",
|
||||
turn_id="t",
|
||||
thread_id="th",
|
||||
)
|
||||
monkeypatch.setattr(CodexAppServerSession, "ensure_started",
|
||||
lambda self: "th")
|
||||
monkeypatch.setattr(CodexAppServerSession, "run_turn", interrupted_turn)
|
||||
|
||||
agent = _make_codex_agent()
|
||||
with patch.object(agent, "_spawn_background_review", return_value=None):
|
||||
result = agent.run_conversation("hi")
|
||||
assert result["completed"] is False
|
||||
assert result["partial"] is True
|
||||
assert result["error"] == "user interrupted"
|
||||
Loading…
Add table
Add a link
Reference in a new issue