fix(xai): accept Grok Build code during loopback wait + tiny screenshot guard

xAI's consent page renders the authorization code in-page instead of
redirecting to the loopback callback, so the listener just hangs and the
manual-paste flow demands a callback URL that never contains the token.

- auth.py: poll stdin non-blockingly while waiting for the xAI loopback
  callback; accept a pasted bare Grok Build code and substitute the locally
  generated state (PKCE code_verifier still binds the exchange). No need to
  wait for timeout or re-run with --manual-paste.
- computer_use: parse PNG/JPEG dimensions from base64 and fall back to the
  text/AX/SOM payload when the screenshot is below the provider minimum
  (8x8), which xAI rejects with HTTP 400.
- model_setup_flows.py: xAI credential reuse prompt uses the standard radio
  picker via a shared _prompt_auth_credentials_choice helper.
- main.py: thread a title through _prompt_provider_choice; re-home the helper
  import (flows live in model_setup_flows.py post-decomposition).

Salvaged from #36781 onto current main (contributor's main.py edits re-homed
to model_setup_flows.py, where the flows were extracted since the PR opened).
This commit is contained in:
Ondrej Drapalik 2026-06-09 22:36:08 -07:00 committed by Teknium
parent 095f526b11
commit 1c055a4c58
8 changed files with 330 additions and 50 deletions

View file

@ -2665,12 +2665,23 @@ def _xai_wait_for_callback(
result: dict[str, Any],
*,
timeout_seconds: float = 180.0,
manual_paste_redirect_uri: Optional[str] = None,
) -> dict[str, Any]:
deadline = time.monotonic() + max(5.0, timeout_seconds)
if manual_paste_redirect_uri and sys.stdin.isatty():
print()
print("If xAI shows a Grok Build code instead of redirecting,")
print("paste that code here and press Enter.")
try:
while time.monotonic() < deadline:
if result["code"] or result["error"]:
return result
if manual_paste_redirect_uri:
raw_paste = _read_ready_stdin_line()
if raw_paste and raw_paste.strip():
pasted = _parse_pasted_callback(raw_paste)
pasted["_manual_paste"] = True
return pasted
time.sleep(0.1)
finally:
server.shutdown()
@ -2694,6 +2705,21 @@ def _xai_wait_for_callback(
)
def _read_ready_stdin_line() -> Optional[str]:
"""Return one pending stdin line without blocking, if the terminal has one."""
try:
if not sys.stdin.isatty():
return None
import select
ready, _, _ = select.select([sys.stdin], [], [], 0)
if not ready:
return None
return sys.stdin.readline()
except Exception:
return None
def _spotify_token_payload_to_state(
token_payload: Dict[str, Any],
*,
@ -6669,6 +6695,7 @@ def _xai_oauth_loopback_login(
authorization_endpoint = discovery["authorization_endpoint"]
token_endpoint = discovery["token_endpoint"]
allow_missing_state = False
if manual_paste:
# No HTTP listener — synthesize a redirect_uri matching what
# the server would have bound to so the authorize URL the user
@ -6695,6 +6722,7 @@ def _xai_oauth_loopback_login(
print("Open this URL to authorize Hermes with xAI:")
print(authorize_url)
callback = _prompt_manual_callback_paste(redirect_uri)
allow_missing_state = True
else:
server, thread, callback_result, redirect_uri = _xai_start_callback_server()
try:
@ -6734,6 +6762,7 @@ def _xai_oauth_loopback_login(
thread,
callback_result,
timeout_seconds=max(30.0, timeout_seconds * 9),
manual_paste_redirect_uri=redirect_uri,
)
except AuthError as exc:
if (
@ -6750,6 +6779,7 @@ def _xai_oauth_loopback_login(
callback = _prompt_manual_callback_paste(redirect_uri)
if callback.get("code") is None and callback.get("error") is None:
raise exc
allow_missing_state = True
except Exception:
try:
server.shutdown()
@ -6770,7 +6800,7 @@ def _xai_oauth_loopback_login(
code="xai_authorization_failed",
)
callback_state = callback.get("state")
# Manual-paste bare-code path: when a user pastes only the opaque
# Manual bare-code paths: when a user pastes only the opaque
# authorization code (no ``code=``/``state=`` query parameters),
# ``_parse_pasted_callback`` returns ``state=None``. xAI's consent
# page renders the code in-page rather than redirecting through the
@ -6778,10 +6808,12 @@ def _xai_oauth_loopback_login(
# VPS, container consoles) the bare code is the only thing the user
# can obtain. PKCE (code_verifier) still binds the exchange to this
# client, so the local state-equality check is redundant on the
# bare-code path — we substitute the locally generated state to keep
# bare-code paths — we substitute the locally generated state to keep
# the rest of the validation chain (and the token exchange) unchanged.
# See #26923 (AccursedGalaxy comment, 2026-05-20).
if callback_state is None and manual_paste:
if callback.get("_manual_paste"):
allow_missing_state = True
if callback_state is None and (manual_paste or allow_missing_state):
callback_state = state
if callback_state != state:
raise AuthError(

View file

@ -499,6 +499,7 @@ from hermes_cli import __version__, __release_date__
# (god-file decomposition Phase 2). Re-imported here so select_provider_and_model and
# existing test monkeypatches (hermes_cli.main._model_flow_*) keep resolving unchanged.
from hermes_cli.model_setup_flows import (
_prompt_auth_credentials_choice,
_model_flow_openrouter,
_model_flow_nous,
_model_flow_openai_codex,
@ -2830,7 +2831,12 @@ def select_provider_and_model(args=None):
member_labels = [
provider_labels.get(m, m) for m in selected_members
]
member_idx = _prompt_provider_choice(member_labels, default=member_default)
group_label = ordered[provider_idx][1].split("", 1)[0]
member_idx = _prompt_provider_choice(
member_labels,
default=member_default,
title=f"Select {group_label} provider:",
)
if member_idx is None:
print("No change.")
return
@ -3331,7 +3337,7 @@ def _aux_flow_custom_endpoint(task: str, task_cfg: dict) -> None:
print(f"{display_name}: custom ({short_url})" + (f" · {model}" if model else ""))
def _prompt_provider_choice(choices, *, default=0):
def _prompt_provider_choice(choices, *, default=0, title="Select provider:"):
"""Show provider selection menu with curses arrow-key navigation.
Falls back to a numbered list when curses is unavailable (e.g. piped
@ -3341,7 +3347,7 @@ def _prompt_provider_choice(choices, *, default=0):
try:
from hermes_cli.setup import _curses_prompt_choice
idx = _curses_prompt_choice("Select provider:", choices, default)
idx = _curses_prompt_choice(title, choices, default)
if idx >= 0:
print()
return idx
@ -3349,7 +3355,7 @@ def _prompt_provider_choice(choices, *, default=0):
pass
# Fallback: numbered list
print("Select provider:")
print(title)
for i, c in enumerate(choices, 1):
marker = "" if i - 1 == default else " "
print(f" {marker} {i}. {c}")

View file

@ -25,6 +25,44 @@ import os
import subprocess
def _prompt_auth_credentials_choice(title: str) -> str:
"""Prompt for reuse / reauthenticate / cancel with the standard radio UI.
Returns one of ``"use"``, ``"reauth"``, ``"cancel"``. Falls back to a
numbered prompt when curses is unavailable (piped stdin, non-TTY).
"""
choices = [
"Use existing credentials",
"Reauthenticate (new OAuth login)",
"Cancel",
]
try:
from hermes_cli.setup import _curses_prompt_choice
idx = _curses_prompt_choice(title, choices, 0)
if idx >= 0:
print()
return ("use", "reauth", "cancel")[idx]
except Exception:
pass
print(title)
for i, label in enumerate(choices, 1):
marker = "" if i == 1 else " "
print(f" {marker} {i}. {label}")
print()
try:
choice = input(" Choice [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
choice = "1"
if choice == "2":
return "reauth"
if choice == "3":
return "cancel"
return "use"
def _model_flow_openrouter(config, current_model=""):
"""OpenRouter provider: ensure API key, then pick model."""
from hermes_cli.main import _prompt_api_key
@ -321,16 +359,9 @@ def _model_flow_openai_codex(config, current_model=""):
if status.get("logged_in"):
print(" OpenAI Codex credentials: ✓")
print()
print(" 1. Use existing credentials")
print(" 2. Reauthenticate (new OAuth login)")
print(" 3. Cancel")
print()
try:
choice = input(" Choice [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
choice = "1"
choice = _prompt_auth_credentials_choice("OpenAI Codex credentials:")
if choice == "2":
if choice == "reauth":
print("Starting a fresh OpenAI Codex login...")
print()
try:
@ -350,7 +381,7 @@ def _model_flow_openai_codex(config, current_model=""):
if not status.get("logged_in"):
print("Login failed.")
return
elif choice == "3":
elif choice == "cancel":
return
else:
print("Not logged into OpenAI Codex. Starting login...")
@ -411,16 +442,11 @@ def _model_flow_xai_oauth(_config, current_model="", *, args=None):
if status.get("logged_in"):
print(" xAI Grok OAuth (SuperGrok / Premium+) credentials: ✓")
print()
print(" 1. Use existing credentials")
print(" 2. Reauthenticate (new OAuth login)")
print(" 3. Cancel")
print()
try:
choice = input(" Choice [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
choice = "1"
choice = _prompt_auth_credentials_choice(
"xAI Grok OAuth (SuperGrok / Premium+) credentials:"
)
if choice == "2":
if choice == "reauth":
print("Starting a fresh xAI OAuth login...")
print()
try:
@ -444,7 +470,7 @@ def _model_flow_xai_oauth(_config, current_model="", *, args=None):
except Exception as exc:
print(f"Login failed: {exc}")
return
elif choice == "3":
elif choice == "cancel":
return
else:
print("Not logged into xAI Grok OAuth (SuperGrok / Premium+). Starting login...")
@ -2560,20 +2586,13 @@ def _model_flow_anthropic(config, current_model=""):
elif cc_available:
print(" Claude Code credentials: ✓ (auto-detected)")
print()
print(" 1. Use existing credentials")
print(" 2. Reauthenticate (new OAuth login)")
print(" 3. Cancel")
print()
try:
choice = input(" Choice [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
choice = "1"
choice = _prompt_auth_credentials_choice("Anthropic credentials:")
if choice == "2":
if choice == "reauth":
needs_auth = True
elif choice == "3":
elif choice == "cancel":
return
# choice == "1" or default: use existing, proceed to model selection
# choice == "use" or default: use existing, proceed to model selection
if needs_auth:
# Show auth method choice

View file

@ -465,7 +465,7 @@ def test_xai_loopback_login_manual_paste_missing_code_raises(monkeypatch):
def test_xai_loopback_login_timeout_falls_back_to_manual_paste(monkeypatch):
"""Loopback timeout should offer the existing manual-paste path."""
"""Loopback timeout should accept a bare Grok Build code paste."""
monkeypatch.setattr(
auth_mod, "_xai_oauth_discovery",
lambda *_a, **_k: {
@ -523,7 +523,7 @@ def test_xai_loopback_login_timeout_falls_back_to_manual_paste(monkeypatch):
captured["prompt_calls"] += 1
return {
"code": "manual-auth-code",
"state": captured["state"],
"state": None,
"error": None,
"error_description": None,
}
@ -558,6 +558,48 @@ def test_xai_loopback_login_timeout_falls_back_to_manual_paste(monkeypatch):
assert creds["tokens"]["refresh_token"] == "rt-timeout"
def test_xai_wait_for_callback_accepts_ready_stdin_code(monkeypatch):
"""Users can paste the Grok Build code while Hermes is still waiting."""
class _StubServer:
shutdown_called = False
close_called = False
def shutdown(self):
self.shutdown_called = True
def server_close(self):
self.close_called = True
class _StubThread:
joined = False
def join(self, timeout=None):
self.joined = True
server = _StubServer()
thread = _StubThread()
monkeypatch.setattr(
auth_mod,
"_read_ready_stdin_line",
lambda: "ready-grok-build-code\n",
)
out = auth_mod._xai_wait_for_callback(
server,
thread,
{"code": None, "error": None},
timeout_seconds=5,
manual_paste_redirect_uri="http://127.0.0.1:56121/callback",
)
assert out["code"] == "ready-grok-build-code"
assert out["state"] is None
assert out["_manual_paste"] is True
assert server.shutdown_called is True
assert server.close_called is True
assert thread.joined is True
def test_xai_loopback_login_timeout_noninteractive_reraises(monkeypatch):
"""Non-interactive stdin must keep the original timeout error."""
monkeypatch.setattr(

View file

@ -0,0 +1,78 @@
import argparse
def test_xai_model_flow_reauth_uses_standard_radio_prompt(monkeypatch):
from hermes_cli import main as main_mod
captured = {"login_calls": 0}
monkeypatch.setattr(
"hermes_cli.auth.get_xai_oauth_auth_status",
lambda: {"logged_in": True},
)
monkeypatch.setattr(
"hermes_cli.setup._curses_prompt_choice",
lambda title, choices, default, description=None: 1,
)
def _fake_login(args, provider, force_new_login=False):
captured["login_calls"] += 1
captured["force_new_login"] = force_new_login
captured["args"] = args
monkeypatch.setattr("hermes_cli.auth._login_xai_oauth", _fake_login)
monkeypatch.setattr(
"hermes_cli.auth.resolve_xai_oauth_runtime_credentials",
lambda *args, **kwargs: {"base_url": "https://api.x.ai/v1"},
)
monkeypatch.setattr(
"hermes_cli.auth._prompt_model_selection",
lambda model_ids, current_model="": None,
)
main_mod._model_flow_xai_oauth(
{},
current_model="grok-build-0.1",
args=argparse.Namespace(manual_paste=True, no_browser=True, timeout=3),
)
assert captured["login_calls"] == 1
assert captured["force_new_login"] is True
assert captured["args"].manual_paste is True
assert captured["args"].no_browser is True
assert captured["args"].timeout == 3
def test_xai_model_flow_cancel_skips_reauth(monkeypatch):
from hermes_cli import main as main_mod
monkeypatch.setattr(
"hermes_cli.auth.get_xai_oauth_auth_status",
lambda: {"logged_in": True},
)
monkeypatch.setattr(
"hermes_cli.setup._curses_prompt_choice",
lambda title, choices, default, description=None: 2,
)
monkeypatch.setattr(
"hermes_cli.auth._login_xai_oauth",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not reauthenticate")),
)
monkeypatch.setattr(
"hermes_cli.auth._prompt_model_selection",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not pick a model")),
)
main_mod._model_flow_xai_oauth({}, current_model="grok-build-0.1")
def test_auth_credentials_choice_falls_back_to_numbered_prompt(monkeypatch):
from hermes_cli import main as main_mod
monkeypatch.setattr(
"hermes_cli.setup._curses_prompt_choice",
lambda title, choices, default, description=None: -1,
)
monkeypatch.setattr("builtins.input", lambda prompt="": "2")
assert main_mod._prompt_auth_credentials_choice("Credentials:") == "reauth"

View file

@ -338,7 +338,7 @@ class TestCaptureResponse:
from tools.computer_use.backend import CaptureResult
from tools.computer_use import tool as cu_tool
fake_png = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII="
fake_png = "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAICAYAAADED76LAAAADUlEQVR4nGNgGAUgAAABCAABgukLHQAAAABJRU5ErkJggg=="
class FakeBackend:
def start(self): pass
@ -372,11 +372,41 @@ class TestCaptureResponse:
assert any(p.get("type") == "image_url" for p in out["content"])
assert any(p.get("type") == "text" for p in out["content"])
def test_capture_tiny_image_returns_text_json(self):
"""Providers can reject <8px images, so placeholders must be omitted."""
from tools.computer_use.backend import CaptureResult, UIElement
from tools.computer_use import tool as cu_tool
tiny_png = "iVBORw0KGgoAAAANSUhEUgAAAAIAAAACCAYAAABytg0kAAAAC0lEQVR4nGNgQAcAABIAAXfx+gAAAAAASUVORK5CYII="
cap = CaptureResult(
mode="som",
width=0,
height=0,
png_b64=tiny_png,
elements=[
UIElement(index=1, role="AXButton", label="Continue", bounds=(10, 20, 30, 30)),
],
app="Safari",
window_title="Example",
png_bytes_len=68,
)
with patch.object(cu_tool, "_should_route_through_aux_vision",
return_value=False):
out = cu_tool._capture_response(cap)
parsed = json.loads(out)
assert parsed["width"] == 2
assert parsed["height"] == 2
assert "screenshot omitted" in parsed["summary"]
assert parsed["elements"][0]["label"] == "Continue"
def test_capture_som_with_elements_formats_index(self):
from tools.computer_use.backend import CaptureResult, UIElement
from tools.computer_use import tool as cu_tool
fake_png = "iVBORw0KGgo="
fake_png = "iVBORw0KGgoAAAANSUhEUgAAAAgAAAAICAYAAADED76LAAAADUlEQVR4nGNgGAUgAAABCAABgukLHQAAAABJRU5ErkJggg=="
class FakeBackend:
def start(self): pass

View file

@ -33,10 +33,10 @@ import pytest
# Fixtures / helpers
# ---------------------------------------------------------------------------
# 1×1 PNG (transparent) — minimal bytes that decode cleanly.
# 8×8 PNG (transparent) — minimal provider-acceptable bytes that decode cleanly.
_PNG_B64 = (
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42m"
"NkYAAAAAYAAjCB0C8AAAAASUVORK5CYII="
"iVBORw0KGgoAAAANSUhEUgAAAAgAAAAICAYAAADED76LAAAADUlEQVR4nG"
"NgGAUgAAABCAABgukLHQAAAABJRU5ErkJggg=="
)
# 1×1 JPEG — used to verify mime detection works for either stream type.

View file

@ -32,10 +32,12 @@ For captures / actions with `capture_after=True`:
from __future__ import annotations
import base64
import json
import logging
import os
import re
import struct
import sys
import threading
from typing import Any, Dict, List, Optional, Tuple
@ -429,6 +431,61 @@ _DEFAULT_MAX_ELEMENTS = 100
# call passing a very large integer would silently disable the safeguard and
# reintroduce the original unbounded behavior.
_MAX_ALLOWED_MAX_ELEMENTS = 1000
_MIN_PROVIDER_IMAGE_DIMENSION = 8
def _image_dimensions_from_b64(image_b64: str) -> Optional[Tuple[int, int]]:
"""Return (width, height) for common inline screenshot formats.
Some providers reject images below 8x8 before the model sees the tool
result. Inspecting the encoded bytes here lets computer_use fall back to
its AX/SOM text payload instead of sending an unusable placeholder.
"""
if not image_b64:
return None
try:
raw = base64.b64decode(image_b64, validate=False)
except Exception:
return None
# PNG: signature + IHDR width/height.
if raw.startswith(b"\x89PNG\r\n\x1a\n") and len(raw) >= 24:
try:
width, height = struct.unpack(">II", raw[16:24])
return int(width), int(height)
except Exception:
return None
# JPEG: scan for SOF markers that carry dimensions.
if raw.startswith(b"\xff\xd8") and len(raw) > 4:
i = 2
while i + 9 < len(raw):
if raw[i] != 0xFF:
i += 1
continue
marker = raw[i + 1]
i += 2
while marker == 0xFF and i < len(raw):
marker = raw[i]
i += 1
if marker in {0xD8, 0xD9}:
continue
if marker == 0xDA:
break
if i + 2 > len(raw):
break
segment_len = int.from_bytes(raw[i:i + 2], "big")
if segment_len < 2 or i + segment_len > len(raw):
break
if marker in {
0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7,
0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF,
} and segment_len >= 7:
height = int.from_bytes(raw[i + 3:i + 5], "big")
width = int.from_bytes(raw[i + 5:i + 7], "big")
return int(width), int(height)
i += segment_len
return None
def _coerce_max_elements(value: Any) -> int:
@ -457,6 +514,16 @@ def _capture_response(cap: CaptureResult, max_elements: int = _DEFAULT_MAX_ELEME
total_elements = len(cap.elements)
visible_elements = cap.elements[:max_elements]
truncated_elements = max(0, total_elements - len(visible_elements))
image_dimensions = _image_dimensions_from_b64(cap.png_b64 or "") if cap.png_b64 else None
response_width = image_dimensions[0] if image_dimensions else cap.width
response_height = image_dimensions[1] if image_dimensions else cap.height
image_too_small = bool(
image_dimensions
and (
image_dimensions[0] < _MIN_PROVIDER_IMAGE_DIMENSION
or image_dimensions[1] < _MIN_PROVIDER_IMAGE_DIMENSION
)
)
# Index only what's actually surfaced in the response — otherwise the
# human-readable summary references element indices the model cannot
@ -464,7 +531,7 @@ def _capture_response(cap: CaptureResult, max_elements: int = _DEFAULT_MAX_ELEME
# 40-line index window).
element_index = _format_elements(visible_elements)
summary_lines = [
f"capture mode={cap.mode} {cap.width}x{cap.height}"
f"capture mode={cap.mode} {response_width}x{response_height}"
+ (f" app={cap.app}" if cap.app else "")
+ (f" window={cap.window_title!r}" if cap.window_title else ""),
f"{total_elements} interactable element(s):",
@ -476,9 +543,15 @@ def _capture_response(cap: CaptureResult, max_elements: int = _DEFAULT_MAX_ELEME
# selected) has a valid value to hand to _route_capture_through_aux_vision.
# The AX path appends the "truncated to N of M" note to summary_lines
# below and rebuilds; the multimodal path keeps this version untouched.
if image_too_small:
summary_lines.append(
f" (screenshot omitted: {image_dimensions[0]}x{image_dimensions[1]} "
f"is below the {_MIN_PROVIDER_IMAGE_DIMENSION}x{_MIN_PROVIDER_IMAGE_DIMENSION} "
"provider minimum)"
)
summary = "\n".join(summary_lines)
if cap.png_b64 and cap.mode != "ax":
if cap.png_b64 and cap.mode != "ax" and not image_too_small:
# Decide whether to hand the screenshot to the auxiliary.vision
# pipeline (text-only result) or keep the multimodal envelope (main
# model handles vision natively). Issue #24015: previously the
@ -510,7 +583,7 @@ def _capture_response(cap: CaptureResult, max_elements: int = _DEFAULT_MAX_ELEME
"image_url": {"url": f"data:{_mime};base64,{cap.png_b64}"}},
],
"text_summary": summary,
"meta": {"mode": cap.mode, "width": cap.width, "height": cap.height,
"meta": {"mode": cap.mode, "width": response_width, "height": response_height,
"elements": total_elements, "png_bytes": cap.png_bytes_len},
}
# AX-only (or image-missing fallback): text path actually carries the
@ -523,8 +596,8 @@ def _capture_response(cap: CaptureResult, max_elements: int = _DEFAULT_MAX_ELEME
summary = "\n".join(summary_lines)
payload: Dict[str, Any] = {
"mode": cap.mode,
"width": cap.width,
"height": cap.height,
"width": response_width,
"height": response_height,
"app": cap.app,
"window_title": cap.window_title,
"elements": [_element_to_dict(e) for e in visible_elements],