test(tui): fix stale mocks + xdist flakes in TUI test suite

All 61 TUI-related tests green across 3 consecutive xdist runs.

tests/tui_gateway/test_protocol.py:
- rename `get_messages` → `get_messages_as_conversation` on mock DB (method
  was renamed in the real backend, test was still stubbing the old name)
- update tool-message shape expectation: `{role, name, context}` matches
  current `_history_to_messages` output, not the legacy `{role, text}`

tests/hermes_cli/test_tui_resume_flow.py:
- `cmd_chat` grew a first-run provider-gate that bailed to "Run: hermes
  setup" before `_launch_tui` was ever reached; 3 tests stubbed
  `_resolve_last_session` + `_launch_tui` but not the gate
- factored a `main_mod` fixture that stubs `_has_any_provider_configured`,
  reused by all three tests

tests/test_tui_gateway_server.py:
- `test_config_set_personality_resets_history_and_returns_info` was flaky
  under xdist because the real `_write_config_key` touches
  `~/.hermes/config.yaml`, racing with any other worker that writes
  config. Stub it in the test.
This commit is contained in:
Brooklyn Nicholson 2026-04-16 19:07:49 -05:00
commit 7f1204840d
36 changed files with 4514 additions and 52 deletions

View file

@ -0,0 +1,764 @@
"""OpenAI-compatible facade that talks to Google's Cloud Code Assist backend.
This adapter lets Hermes use the ``google-gemini-cli`` provider as if it were
a standard OpenAI-shaped chat completion endpoint, while the underlying HTTP
traffic goes to ``cloudcode-pa.googleapis.com/v1internal:{generateContent,
streamGenerateContent}`` with a Bearer access token obtained via OAuth PKCE.
Architecture
------------
- ``GeminiCloudCodeClient`` exposes ``.chat.completions.create(**kwargs)``
mirroring the subset of the OpenAI SDK that ``run_agent.py`` uses.
- Incoming OpenAI ``messages[]`` / ``tools[]`` / ``tool_choice`` are translated
to Gemini's native ``contents[]`` / ``tools[].functionDeclarations`` /
``toolConfig`` / ``systemInstruction`` shape.
- The request body is wrapped ``{project, model, user_prompt_id, request}``
per Code Assist API expectations.
- Responses (``candidates[].content.parts[]``) are converted back to
OpenAI ``choices[0].message`` shape with ``content`` + ``tool_calls``.
- Streaming uses SSE (``?alt=sse``) and yields OpenAI-shaped delta chunks.
Attribution
-----------
Translation semantics follow jenslys/opencode-gemini-auth (MIT) and the public
Gemini API docs. Request envelope shape
(``{project, model, user_prompt_id, request}``) is documented nowhere; it is
reverse-engineered from the opencode-gemini-auth and clawdbot implementations.
"""
from __future__ import annotations
import json
import logging
import os
import time
import uuid
from types import SimpleNamespace
from typing import Any, Dict, Iterator, List, Optional
import httpx
from agent import google_oauth
from agent.google_code_assist import (
CODE_ASSIST_ENDPOINT,
FREE_TIER_ID,
CodeAssistError,
ProjectContext,
resolve_project_context,
)
logger = logging.getLogger(__name__)
# =============================================================================
# Request translation: OpenAI → Gemini
# =============================================================================
_ROLE_MAP_OPENAI_TO_GEMINI = {
"user": "user",
"assistant": "model",
"system": "user", # handled separately via systemInstruction
"tool": "user", # functionResponse is wrapped in a user-role turn
"function": "user",
}
def _coerce_content_to_text(content: Any) -> str:
"""OpenAI content may be str or a list of parts; reduce to plain text."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
pieces: List[str] = []
for p in content:
if isinstance(p, str):
pieces.append(p)
elif isinstance(p, dict):
if p.get("type") == "text" and isinstance(p.get("text"), str):
pieces.append(p["text"])
# Multimodal (image_url, etc.) — stub for now; log and skip
elif p.get("type") in ("image_url", "input_audio"):
logger.debug("Dropping multimodal part (not yet supported): %s", p.get("type"))
return "\n".join(pieces)
return str(content)
def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
"""OpenAI tool_call -> Gemini functionCall part."""
fn = tool_call.get("function") or {}
args_raw = fn.get("arguments", "")
try:
args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
except json.JSONDecodeError:
args = {"_raw": args_raw}
if not isinstance(args, dict):
args = {"_value": args}
return {
"functionCall": {
"name": fn.get("name") or "",
"args": args,
},
# Sentinel signature — matches opencode-gemini-auth's approach.
# Without this, Code Assist rejects function calls that originated
# outside its own chain.
"thoughtSignature": "skip_thought_signature_validator",
}
def _translate_tool_result_to_gemini(message: Dict[str, Any]) -> Dict[str, Any]:
"""OpenAI tool-role message -> Gemini functionResponse part.
The function name isn't in the OpenAI tool message directly; it must be
passed via the assistant message that issued the call. For simplicity we
look up ``name`` on the message (OpenAI SDK copies it there) or on the
``tool_call_id`` cross-reference.
"""
name = str(message.get("name") or message.get("tool_call_id") or "tool")
content = _coerce_content_to_text(message.get("content"))
# Gemini expects the response as a dict under `response`. We wrap plain
# text in {"output": "..."}.
try:
parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None
except json.JSONDecodeError:
parsed = None
response = parsed if isinstance(parsed, dict) else {"output": content}
return {
"functionResponse": {
"name": name,
"response": response,
},
}
def _build_gemini_contents(
messages: List[Dict[str, Any]],
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
"""Convert OpenAI messages[] to Gemini contents[] + systemInstruction."""
system_text_parts: List[str] = []
contents: List[Dict[str, Any]] = []
for msg in messages:
if not isinstance(msg, dict):
continue
role = str(msg.get("role") or "user")
if role == "system":
system_text_parts.append(_coerce_content_to_text(msg.get("content")))
continue
# Tool result message — emit a user-role turn with functionResponse
if role == "tool" or role == "function":
contents.append({
"role": "user",
"parts": [_translate_tool_result_to_gemini(msg)],
})
continue
gemini_role = _ROLE_MAP_OPENAI_TO_GEMINI.get(role, "user")
parts: List[Dict[str, Any]] = []
text = _coerce_content_to_text(msg.get("content"))
if text:
parts.append({"text": text})
# Assistant messages can carry tool_calls
tool_calls = msg.get("tool_calls") or []
if isinstance(tool_calls, list):
for tc in tool_calls:
if isinstance(tc, dict):
parts.append(_translate_tool_call_to_gemini(tc))
if not parts:
# Gemini rejects empty parts; skip the turn entirely
continue
contents.append({"role": gemini_role, "parts": parts})
system_instruction: Optional[Dict[str, Any]] = None
joined_system = "\n".join(p for p in system_text_parts if p).strip()
if joined_system:
system_instruction = {
"role": "system",
"parts": [{"text": joined_system}],
}
return contents, system_instruction
def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:
"""OpenAI tools[] -> Gemini tools[].functionDeclarations[]."""
if not isinstance(tools, list) or not tools:
return []
declarations: List[Dict[str, Any]] = []
for t in tools:
if not isinstance(t, dict):
continue
fn = t.get("function") or {}
if not isinstance(fn, dict):
continue
name = fn.get("name")
if not name:
continue
decl = {"name": str(name)}
if fn.get("description"):
decl["description"] = str(fn["description"])
params = fn.get("parameters")
if isinstance(params, dict):
decl["parameters"] = params
declarations.append(decl)
if not declarations:
return []
return [{"functionDeclarations": declarations}]
def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]:
"""OpenAI tool_choice -> Gemini toolConfig.functionCallingConfig."""
if tool_choice is None:
return None
if isinstance(tool_choice, str):
if tool_choice == "auto":
return {"functionCallingConfig": {"mode": "AUTO"}}
if tool_choice == "required":
return {"functionCallingConfig": {"mode": "ANY"}}
if tool_choice == "none":
return {"functionCallingConfig": {"mode": "NONE"}}
if isinstance(tool_choice, dict):
fn = tool_choice.get("function") or {}
name = fn.get("name")
if name:
return {
"functionCallingConfig": {
"mode": "ANY",
"allowedFunctionNames": [str(name)],
},
}
return None
def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]:
"""Accept thinkingBudget / thinkingLevel / includeThoughts (+ snake_case)."""
if not isinstance(config, dict) or not config:
return None
budget = config.get("thinkingBudget", config.get("thinking_budget"))
level = config.get("thinkingLevel", config.get("thinking_level"))
include = config.get("includeThoughts", config.get("include_thoughts"))
normalized: Dict[str, Any] = {}
if isinstance(budget, (int, float)):
normalized["thinkingBudget"] = int(budget)
if isinstance(level, str) and level.strip():
normalized["thinkingLevel"] = level.strip().lower()
if isinstance(include, bool):
normalized["includeThoughts"] = include
return normalized or None
def build_gemini_request(
*,
messages: List[Dict[str, Any]],
tools: Any = None,
tool_choice: Any = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
stop: Any = None,
thinking_config: Any = None,
) -> Dict[str, Any]:
"""Build the inner Gemini request body (goes inside ``request`` wrapper)."""
contents, system_instruction = _build_gemini_contents(messages)
body: Dict[str, Any] = {"contents": contents}
if system_instruction is not None:
body["systemInstruction"] = system_instruction
gemini_tools = _translate_tools_to_gemini(tools)
if gemini_tools:
body["tools"] = gemini_tools
tool_cfg = _translate_tool_choice_to_gemini(tool_choice)
if tool_cfg is not None:
body["toolConfig"] = tool_cfg
generation_config: Dict[str, Any] = {}
if isinstance(temperature, (int, float)):
generation_config["temperature"] = float(temperature)
if isinstance(max_tokens, int) and max_tokens > 0:
generation_config["maxOutputTokens"] = max_tokens
if isinstance(top_p, (int, float)):
generation_config["topP"] = float(top_p)
if isinstance(stop, str) and stop:
generation_config["stopSequences"] = [stop]
elif isinstance(stop, list) and stop:
generation_config["stopSequences"] = [str(s) for s in stop if s]
normalized_thinking = _normalize_thinking_config(thinking_config)
if normalized_thinking:
generation_config["thinkingConfig"] = normalized_thinking
if generation_config:
body["generationConfig"] = generation_config
return body
def wrap_code_assist_request(
*,
project_id: str,
model: str,
inner_request: Dict[str, Any],
user_prompt_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Wrap the inner Gemini request in the Code Assist envelope."""
return {
"project": project_id,
"model": model,
"user_prompt_id": user_prompt_id or str(uuid.uuid4()),
"request": inner_request,
}
# =============================================================================
# Response translation: Gemini → OpenAI
# =============================================================================
def _translate_gemini_response(
resp: Dict[str, Any],
model: str,
) -> SimpleNamespace:
"""Non-streaming Gemini response -> OpenAI-shaped SimpleNamespace.
Code Assist wraps the actual Gemini response inside ``response``, so we
unwrap it first if present.
"""
inner = resp.get("response") if isinstance(resp.get("response"), dict) else resp
candidates = inner.get("candidates") or []
if not isinstance(candidates, list) or not candidates:
return _empty_response(model)
cand = candidates[0]
content_obj = cand.get("content") if isinstance(cand, dict) else {}
parts = content_obj.get("parts") if isinstance(content_obj, dict) else []
text_pieces: List[str] = []
reasoning_pieces: List[str] = []
tool_calls: List[SimpleNamespace] = []
for i, part in enumerate(parts or []):
if not isinstance(part, dict):
continue
# Thought parts are model's internal reasoning — surface as reasoning,
# don't mix into content.
if part.get("thought") is True:
if isinstance(part.get("text"), str):
reasoning_pieces.append(part["text"])
continue
if isinstance(part.get("text"), str):
text_pieces.append(part["text"])
continue
fc = part.get("functionCall")
if isinstance(fc, dict) and fc.get("name"):
try:
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
except (TypeError, ValueError):
args_str = "{}"
tool_calls.append(SimpleNamespace(
id=f"call_{uuid.uuid4().hex[:12]}",
type="function",
index=i,
function=SimpleNamespace(name=str(fc["name"]), arguments=args_str),
))
finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason(
str(cand.get("finishReason") or "")
)
usage_meta = inner.get("usageMetadata") or {}
usage = SimpleNamespace(
prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
total_tokens=int(usage_meta.get("totalTokenCount") or 0),
prompt_tokens_details=SimpleNamespace(
cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
),
)
message = SimpleNamespace(
role="assistant",
content="".join(text_pieces) if text_pieces else None,
tool_calls=tool_calls or None,
reasoning="".join(reasoning_pieces) or None,
reasoning_content="".join(reasoning_pieces) or None,
reasoning_details=None,
)
choice = SimpleNamespace(
index=0,
message=message,
finish_reason=finish_reason,
)
return SimpleNamespace(
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
object="chat.completion",
created=int(time.time()),
model=model,
choices=[choice],
usage=usage,
)
def _empty_response(model: str) -> SimpleNamespace:
message = SimpleNamespace(
role="assistant", content="", tool_calls=None,
reasoning=None, reasoning_content=None, reasoning_details=None,
)
choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
usage = SimpleNamespace(
prompt_tokens=0, completion_tokens=0, total_tokens=0,
prompt_tokens_details=SimpleNamespace(cached_tokens=0),
)
return SimpleNamespace(
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
object="chat.completion",
created=int(time.time()),
model=model,
choices=[choice],
usage=usage,
)
def _map_gemini_finish_reason(reason: str) -> str:
mapping = {
"STOP": "stop",
"MAX_TOKENS": "length",
"SAFETY": "content_filter",
"RECITATION": "content_filter",
"OTHER": "stop",
}
return mapping.get(reason.upper(), "stop")
# =============================================================================
# Streaming SSE iterator
# =============================================================================
class _GeminiStreamChunk(SimpleNamespace):
"""Mimics an OpenAI ChatCompletionChunk with .choices[0].delta."""
pass
def _make_stream_chunk(
*,
model: str,
content: str = "",
tool_call_delta: Optional[Dict[str, Any]] = None,
finish_reason: Optional[str] = None,
reasoning: str = "",
) -> _GeminiStreamChunk:
delta_kwargs: Dict[str, Any] = {"role": "assistant"}
if content:
delta_kwargs["content"] = content
if tool_call_delta is not None:
delta_kwargs["tool_calls"] = [SimpleNamespace(
index=tool_call_delta.get("index", 0),
id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}",
type="function",
function=SimpleNamespace(
name=tool_call_delta.get("name") or "",
arguments=tool_call_delta.get("arguments") or "",
),
)]
if reasoning:
delta_kwargs["reasoning"] = reasoning
delta_kwargs["reasoning_content"] = reasoning
delta = SimpleNamespace(**delta_kwargs)
choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason)
return _GeminiStreamChunk(
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
object="chat.completion.chunk",
created=int(time.time()),
model=model,
choices=[choice],
usage=None,
)
def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]:
"""Parse Server-Sent Events from an httpx streaming response."""
buffer = ""
for chunk in response.iter_text():
if not chunk:
continue
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.rstrip("\r")
if not line:
continue
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
return
try:
yield json.loads(data)
except json.JSONDecodeError:
logger.debug("Non-JSON SSE line: %s", data[:200])
def _translate_stream_event(
event: Dict[str, Any],
model: str,
tool_call_indices: Dict[str, int],
) -> List[_GeminiStreamChunk]:
"""Unwrap Code Assist envelope and emit OpenAI-shaped chunk(s)."""
inner = event.get("response") if isinstance(event.get("response"), dict) else event
candidates = inner.get("candidates") or []
if not candidates:
return []
cand = candidates[0]
if not isinstance(cand, dict):
return []
chunks: List[_GeminiStreamChunk] = []
content = cand.get("content") or {}
parts = content.get("parts") if isinstance(content, dict) else []
for part in parts or []:
if not isinstance(part, dict):
continue
if part.get("thought") is True and isinstance(part.get("text"), str):
chunks.append(_make_stream_chunk(
model=model, reasoning=part["text"],
))
continue
if isinstance(part.get("text"), str) and part["text"]:
chunks.append(_make_stream_chunk(model=model, content=part["text"]))
fc = part.get("functionCall")
if isinstance(fc, dict) and fc.get("name"):
name = str(fc["name"])
idx = tool_call_indices.setdefault(name, len(tool_call_indices))
try:
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
except (TypeError, ValueError):
args_str = "{}"
chunks.append(_make_stream_chunk(
model=model,
tool_call_delta={
"index": idx,
"name": name,
"arguments": args_str,
},
))
finish_reason_raw = str(cand.get("finishReason") or "")
if finish_reason_raw:
mapped = _map_gemini_finish_reason(finish_reason_raw)
if tool_call_indices:
mapped = "tool_calls"
chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
return chunks
# =============================================================================
# GeminiCloudCodeClient — OpenAI-compatible facade
# =============================================================================
MARKER_BASE_URL = "cloudcode-pa://google"
class _GeminiChatCompletions:
def __init__(self, client: "GeminiCloudCodeClient"):
self._client = client
def create(self, **kwargs: Any) -> Any:
return self._client._create_chat_completion(**kwargs)
class _GeminiChatNamespace:
def __init__(self, client: "GeminiCloudCodeClient"):
self.completions = _GeminiChatCompletions(client)
class GeminiCloudCodeClient:
"""Minimal OpenAI-SDK-compatible facade over Code Assist v1internal."""
def __init__(
self,
*,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
default_headers: Optional[Dict[str, str]] = None,
project_id: str = "",
**_: Any,
):
# `api_key` here is a dummy — real auth is the OAuth access token
# fetched on every call via agent.google_oauth.get_valid_access_token().
# We accept the kwarg for openai.OpenAI interface parity.
self.api_key = api_key or "google-oauth"
self.base_url = base_url or MARKER_BASE_URL
self._default_headers = dict(default_headers or {})
self._configured_project_id = project_id
self._project_context: Optional[ProjectContext] = None
self._project_context_lock = False # simple single-thread guard
self.chat = _GeminiChatNamespace(self)
self.is_closed = False
self._http = httpx.Client(timeout=httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0))
def close(self) -> None:
self.is_closed = True
try:
self._http.close()
except Exception:
pass
# Implement the OpenAI SDK's context-manager-ish closure check
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _ensure_project_context(self, access_token: str, model: str) -> ProjectContext:
"""Lazily resolve and cache the project context for this client."""
if self._project_context is not None:
return self._project_context
env_project = google_oauth.resolve_project_id_from_env()
creds = google_oauth.load_credentials()
stored_project = creds.project_id if creds else ""
# Prefer what's already baked into the creds
if stored_project:
self._project_context = ProjectContext(
project_id=stored_project,
managed_project_id=creds.managed_project_id if creds else "",
tier_id="",
source="stored",
)
return self._project_context
ctx = resolve_project_context(
access_token,
configured_project_id=self._configured_project_id,
env_project_id=env_project,
user_agent_model=model,
)
# Persist discovered project back to the creds file so the next
# session doesn't re-run the discovery.
if ctx.project_id or ctx.managed_project_id:
google_oauth.update_project_ids(
project_id=ctx.project_id,
managed_project_id=ctx.managed_project_id,
)
self._project_context = ctx
return ctx
def _create_chat_completion(
self,
*,
model: str = "gemini-2.5-flash",
messages: Optional[List[Dict[str, Any]]] = None,
stream: bool = False,
tools: Any = None,
tool_choice: Any = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
stop: Any = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Any = None,
**_: Any,
) -> Any:
access_token = google_oauth.get_valid_access_token()
ctx = self._ensure_project_context(access_token, model)
thinking_config = None
if isinstance(extra_body, dict):
thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig")
inner = build_gemini_request(
messages=messages or [],
tools=tools,
tool_choice=tool_choice,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stop=stop,
thinking_config=thinking_config,
)
wrapped = wrap_code_assist_request(
project_id=ctx.project_id,
model=model,
inner_request=inner,
)
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {access_token}",
"User-Agent": "hermes-agent (gemini-cli-compat)",
"X-Goog-Api-Client": "gl-python/hermes",
"x-activity-request-id": str(uuid.uuid4()),
}
headers.update(self._default_headers)
if stream:
return self._stream_completion(model=model, wrapped=wrapped, headers=headers)
url = f"{CODE_ASSIST_ENDPOINT}/v1internal:generateContent"
response = self._http.post(url, json=wrapped, headers=headers)
if response.status_code != 200:
raise _gemini_http_error(response)
try:
payload = response.json()
except ValueError as exc:
raise CodeAssistError(
f"Invalid JSON from Code Assist: {exc}",
code="code_assist_invalid_json",
) from exc
return _translate_gemini_response(payload, model=model)
def _stream_completion(
self,
*,
model: str,
wrapped: Dict[str, Any],
headers: Dict[str, str],
) -> Iterator[_GeminiStreamChunk]:
"""Generator that yields OpenAI-shaped streaming chunks."""
url = f"{CODE_ASSIST_ENDPOINT}/v1internal:streamGenerateContent?alt=sse"
stream_headers = dict(headers)
stream_headers["Accept"] = "text/event-stream"
def _generator() -> Iterator[_GeminiStreamChunk]:
try:
with self._http.stream("POST", url, json=wrapped, headers=stream_headers) as response:
if response.status_code != 200:
# Materialize error body for better diagnostics
response.read()
raise _gemini_http_error(response)
tool_call_indices: Dict[str, int] = {}
for event in _iter_sse_events(response):
for chunk in _translate_stream_event(event, model, tool_call_indices):
yield chunk
except httpx.HTTPError as exc:
raise CodeAssistError(
f"Streaming request failed: {exc}",
code="code_assist_stream_error",
) from exc
return _generator()
def _gemini_http_error(response: httpx.Response) -> CodeAssistError:
status = response.status_code
try:
body = response.text[:500]
except Exception:
body = ""
# Let run_agent's retry logic see auth errors as rotatable via `api_key`
code = f"code_assist_http_{status}"
if status == 401:
code = "code_assist_unauthorized"
elif status == 429:
code = "code_assist_rate_limited"
return CodeAssistError(
f"Code Assist returned HTTP {status}: {body}",
code=code,
)

417
agent/google_code_assist.py Normal file
View file

@ -0,0 +1,417 @@
"""Google Code Assist API client — project discovery, onboarding, quota.
The Code Assist API powers Google's official gemini-cli. It sits at
``cloudcode-pa.googleapis.com`` and provides:
- Free tier access (generous daily quota) for personal Google accounts
- Paid tier access via GCP projects with billing / Workspace / Standard / Enterprise
This module handles the control-plane dance needed before inference:
1. ``load_code_assist()`` probe the user's account to learn what tier they're on
and whether a ``cloudaicompanionProject`` is already assigned.
2. ``onboard_user()`` if the user hasn't been onboarded yet (new account, fresh
free tier, etc.), call this with the chosen tier + project id. Supports LRO
polling for slow provisioning.
3. ``retrieve_user_quota()`` fetch the ``buckets[]`` array showing remaining
quota per model, used by the ``/gquota`` slash command.
VPC-SC handling: enterprise accounts under a VPC Service Controls perimeter
will get ``SECURITY_POLICY_VIOLATED`` on ``load_code_assist``. We catch this
and force the account to ``standard-tier`` so the call chain still succeeds.
Derived from opencode-gemini-auth (MIT) and clawdbot/extensions/google. The
request/response shapes are specific to Google's internal Code Assist API,
documented nowhere public we copy them from the reference implementations.
"""
from __future__ import annotations
import json
import logging
import os
import time
import urllib.error
import urllib.parse
import urllib.request
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# =============================================================================
# Constants
# =============================================================================
CODE_ASSIST_ENDPOINT = "https://cloudcode-pa.googleapis.com"
# Fallback endpoints tried when prod returns an error during project discovery
FALLBACK_ENDPOINTS = [
"https://daily-cloudcode-pa.sandbox.googleapis.com",
"https://autopush-cloudcode-pa.sandbox.googleapis.com",
]
# Tier identifiers that Google's API uses
FREE_TIER_ID = "free-tier"
LEGACY_TIER_ID = "legacy-tier"
STANDARD_TIER_ID = "standard-tier"
# Default HTTP headers matching gemini-cli's fingerprint.
# Google may reject unrecognized User-Agents on these internal endpoints.
_GEMINI_CLI_USER_AGENT = "google-api-nodejs-client/9.15.1 (gzip)"
_X_GOOG_API_CLIENT = "gl-node/24.0.0"
_DEFAULT_REQUEST_TIMEOUT = 30.0
_ONBOARDING_POLL_ATTEMPTS = 12
_ONBOARDING_POLL_INTERVAL_SECONDS = 5.0
class CodeAssistError(RuntimeError):
def __init__(self, message: str, *, code: str = "code_assist_error") -> None:
super().__init__(message)
self.code = code
class ProjectIdRequiredError(CodeAssistError):
def __init__(self, message: str = "GCP project id required for this tier") -> None:
super().__init__(message, code="code_assist_project_id_required")
# =============================================================================
# HTTP primitive (auth via Bearer token passed per-call)
# =============================================================================
def _build_headers(access_token: str, *, user_agent_model: str = "") -> Dict[str, str]:
ua = _GEMINI_CLI_USER_AGENT
if user_agent_model:
ua = f"{ua} model/{user_agent_model}"
return {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": f"Bearer {access_token}",
"User-Agent": ua,
"X-Goog-Api-Client": _X_GOOG_API_CLIENT,
"x-activity-request-id": str(uuid.uuid4()),
}
def _client_metadata() -> Dict[str, str]:
"""Match Google's gemini-cli exactly — unrecognized metadata may be rejected."""
return {
"ideType": "IDE_UNSPECIFIED",
"platform": "PLATFORM_UNSPECIFIED",
"pluginType": "GEMINI",
}
def _post_json(
url: str,
body: Dict[str, Any],
access_token: str,
*,
timeout: float = _DEFAULT_REQUEST_TIMEOUT,
user_agent_model: str = "",
) -> Dict[str, Any]:
data = json.dumps(body).encode("utf-8")
request = urllib.request.Request(
url, data=data, method="POST",
headers=_build_headers(access_token, user_agent_model=user_agent_model),
)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
raw = response.read().decode("utf-8", errors="replace")
return json.loads(raw) if raw else {}
except urllib.error.HTTPError as exc:
detail = ""
try:
detail = exc.read().decode("utf-8", errors="replace")
except Exception:
pass
# Special case: VPC-SC violation should be distinguishable
if _is_vpc_sc_violation(detail):
raise CodeAssistError(
f"VPC-SC policy violation: {detail}",
code="code_assist_vpc_sc",
) from exc
raise CodeAssistError(
f"Code Assist HTTP {exc.code}: {detail or exc.reason}",
code=f"code_assist_http_{exc.code}",
) from exc
except urllib.error.URLError as exc:
raise CodeAssistError(
f"Code Assist request failed: {exc}",
code="code_assist_network_error",
) from exc
def _is_vpc_sc_violation(body: str) -> bool:
"""Detect a VPC Service Controls violation from a response body."""
if not body:
return False
try:
parsed = json.loads(body)
except (json.JSONDecodeError, ValueError):
return "SECURITY_POLICY_VIOLATED" in body
# Walk the nested error structure Google uses
error = parsed.get("error") if isinstance(parsed, dict) else None
if not isinstance(error, dict):
return False
details = error.get("details") or []
if isinstance(details, list):
for item in details:
if isinstance(item, dict):
reason = item.get("reason") or ""
if reason == "SECURITY_POLICY_VIOLATED":
return True
msg = str(error.get("message", ""))
return "SECURITY_POLICY_VIOLATED" in msg
# =============================================================================
# load_code_assist — discovers current tier + assigned project
# =============================================================================
@dataclass
class CodeAssistProjectInfo:
"""Result from ``load_code_assist``."""
current_tier_id: str = ""
cloudaicompanion_project: str = "" # Google-managed project (free tier)
allowed_tiers: List[str] = field(default_factory=list)
raw: Dict[str, Any] = field(default_factory=dict)
def load_code_assist(
access_token: str,
*,
project_id: str = "",
user_agent_model: str = "",
) -> CodeAssistProjectInfo:
"""Call ``POST /v1internal:loadCodeAssist`` with prod → sandbox fallback.
Returns whatever tier + project info Google reports. On VPC-SC violations,
returns a synthetic ``standard-tier`` result so the chain can continue.
"""
body: Dict[str, Any] = {
"metadata": {
"duetProject": project_id,
**_client_metadata(),
},
}
if project_id:
body["cloudaicompanionProject"] = project_id
endpoints = [CODE_ASSIST_ENDPOINT] + FALLBACK_ENDPOINTS
last_err: Optional[Exception] = None
for endpoint in endpoints:
url = f"{endpoint}/v1internal:loadCodeAssist"
try:
resp = _post_json(url, body, access_token, user_agent_model=user_agent_model)
return _parse_load_response(resp)
except CodeAssistError as exc:
if exc.code == "code_assist_vpc_sc":
logger.info("VPC-SC violation on %s — defaulting to standard-tier", endpoint)
return CodeAssistProjectInfo(
current_tier_id=STANDARD_TIER_ID,
cloudaicompanion_project=project_id,
)
last_err = exc
logger.warning("loadCodeAssist failed on %s: %s", endpoint, exc)
continue
if last_err:
raise last_err
return CodeAssistProjectInfo()
def _parse_load_response(resp: Dict[str, Any]) -> CodeAssistProjectInfo:
current_tier = resp.get("currentTier") or {}
tier_id = str(current_tier.get("id") or "") if isinstance(current_tier, dict) else ""
project = str(resp.get("cloudaicompanionProject") or "")
allowed = resp.get("allowedTiers") or []
allowed_ids: List[str] = []
if isinstance(allowed, list):
for t in allowed:
if isinstance(t, dict):
tid = str(t.get("id") or "")
if tid:
allowed_ids.append(tid)
return CodeAssistProjectInfo(
current_tier_id=tier_id,
cloudaicompanion_project=project,
allowed_tiers=allowed_ids,
raw=resp,
)
# =============================================================================
# onboard_user — provisions a new user on a tier (with LRO polling)
# =============================================================================
def onboard_user(
access_token: str,
*,
tier_id: str,
project_id: str = "",
user_agent_model: str = "",
) -> Dict[str, Any]:
"""Call ``POST /v1internal:onboardUser`` to provision the user.
For paid tiers, ``project_id`` is REQUIRED (raises ProjectIdRequiredError).
For free tiers, ``project_id`` is optional Google will assign one.
Returns the final operation response. Polls ``/v1internal/<name>`` for up
to ``_ONBOARDING_POLL_ATTEMPTS`` × ``_ONBOARDING_POLL_INTERVAL_SECONDS``
(default: 12 × 5s = 1 min).
"""
if tier_id != FREE_TIER_ID and tier_id != LEGACY_TIER_ID and not project_id:
raise ProjectIdRequiredError(
f"Tier {tier_id!r} requires a GCP project id. "
"Set HERMES_GEMINI_PROJECT_ID or GOOGLE_CLOUD_PROJECT."
)
body: Dict[str, Any] = {
"tierId": tier_id,
"metadata": _client_metadata(),
}
if project_id:
body["cloudaicompanionProject"] = project_id
endpoint = CODE_ASSIST_ENDPOINT
url = f"{endpoint}/v1internal:onboardUser"
resp = _post_json(url, body, access_token, user_agent_model=user_agent_model)
# Poll if LRO (long-running operation)
if not resp.get("done"):
op_name = resp.get("name", "")
if not op_name:
return resp
for attempt in range(_ONBOARDING_POLL_ATTEMPTS):
time.sleep(_ONBOARDING_POLL_INTERVAL_SECONDS)
poll_url = f"{endpoint}/v1internal/{op_name}"
try:
poll_resp = _post_json(poll_url, {}, access_token, user_agent_model=user_agent_model)
except CodeAssistError as exc:
logger.warning("Onboarding poll attempt %d failed: %s", attempt + 1, exc)
continue
if poll_resp.get("done"):
return poll_resp
logger.warning("Onboarding did not complete within %d attempts", _ONBOARDING_POLL_ATTEMPTS)
return resp
# =============================================================================
# retrieve_user_quota — for /gquota
# =============================================================================
@dataclass
class QuotaBucket:
model_id: str
token_type: str = ""
remaining_fraction: float = 0.0
reset_time_iso: str = ""
raw: Dict[str, Any] = field(default_factory=dict)
def retrieve_user_quota(
access_token: str,
*,
project_id: str = "",
user_agent_model: str = "",
) -> List[QuotaBucket]:
"""Call ``POST /v1internal:retrieveUserQuota`` and parse ``buckets[]``."""
body: Dict[str, Any] = {}
if project_id:
body["project"] = project_id
url = f"{CODE_ASSIST_ENDPOINT}/v1internal:retrieveUserQuota"
resp = _post_json(url, body, access_token, user_agent_model=user_agent_model)
raw_buckets = resp.get("buckets") or []
buckets: List[QuotaBucket] = []
if not isinstance(raw_buckets, list):
return buckets
for b in raw_buckets:
if not isinstance(b, dict):
continue
buckets.append(QuotaBucket(
model_id=str(b.get("modelId") or ""),
token_type=str(b.get("tokenType") or ""),
remaining_fraction=float(b.get("remainingFraction") or 0.0),
reset_time_iso=str(b.get("resetTime") or ""),
raw=b,
))
return buckets
# =============================================================================
# Project context resolution
# =============================================================================
@dataclass
class ProjectContext:
"""Resolved state for a given OAuth session."""
project_id: str = "" # effective project id sent on requests
managed_project_id: str = "" # Google-assigned project (free tier)
tier_id: str = ""
source: str = "" # "env", "config", "discovered", "onboarded"
def resolve_project_context(
access_token: str,
*,
configured_project_id: str = "",
env_project_id: str = "",
user_agent_model: str = "",
) -> ProjectContext:
"""Figure out what project id + tier to use for requests.
Priority:
1. If configured_project_id or env_project_id is set, use that directly
and short-circuit (no discovery needed).
2. Otherwise call loadCodeAssist to see what Google says.
3. If no tier assigned yet, onboard the user (free tier default).
"""
# Short-circuit: caller provided a project id
if configured_project_id:
return ProjectContext(
project_id=configured_project_id,
tier_id=STANDARD_TIER_ID, # assume paid since they specified one
source="config",
)
if env_project_id:
return ProjectContext(
project_id=env_project_id,
tier_id=STANDARD_TIER_ID,
source="env",
)
# Discover via loadCodeAssist
info = load_code_assist(access_token, user_agent_model=user_agent_model)
effective_project = info.cloudaicompanion_project
tier = info.current_tier_id
if not tier:
# User hasn't been onboarded — provision them on free tier
onboard_resp = onboard_user(
access_token,
tier_id=FREE_TIER_ID,
project_id="",
user_agent_model=user_agent_model,
)
# Re-parse from the onboard response
response_body = onboard_resp.get("response") or {}
if isinstance(response_body, dict):
effective_project = (
effective_project
or str(response_body.get("cloudaicompanionProject") or "")
)
tier = FREE_TIER_ID
source = "onboarded"
else:
source = "discovered"
return ProjectContext(
project_id=effective_project,
managed_project_id=effective_project if tier == FREE_TIER_ID else "",
tier_id=tier,
source=source,
)

1048
agent/google_oauth.py Normal file

File diff suppressed because it is too large Load diff

240
cli.py
View file

@ -5028,6 +5028,52 @@ class HermesCLI:
return "\n".join(p for p in parts if p)
return str(value)
def _handle_gquota_command(self, cmd_original: str) -> None:
"""Show Google Gemini Code Assist quota usage for the current OAuth account."""
try:
from agent.google_oauth import get_valid_access_token, GoogleOAuthError, load_credentials
from agent.google_code_assist import retrieve_user_quota, CodeAssistError
except ImportError as exc:
self.console.print(f" [red]Gemini modules unavailable: {exc}[/]")
return
try:
access_token = get_valid_access_token()
except GoogleOAuthError as exc:
self.console.print(f" [yellow]{exc}[/]")
self.console.print(" Run [bold]/model[/] and pick 'Google Gemini (OAuth)' to sign in.")
return
creds = load_credentials()
project_id = (creds.project_id if creds else "") or ""
try:
buckets = retrieve_user_quota(access_token, project_id=project_id)
except CodeAssistError as exc:
self.console.print(f" [red]Quota lookup failed:[/] {exc}")
return
if not buckets:
self.console.print(" [dim]No quota buckets reported (account may be on legacy/unmetered tier).[/]")
return
# Sort for stable display, group by model
buckets.sort(key=lambda b: (b.model_id, b.token_type))
self.console.print()
self.console.print(f" [bold]Gemini Code Assist quota[/] (project: {project_id or '(auto / free-tier)'})")
self.console.print()
for b in buckets:
pct = max(0.0, min(1.0, b.remaining_fraction))
width = 20
filled = int(round(pct * width))
bar = "" * filled + "" * (width - filled)
pct_str = f"{int(pct * 100):3d}%"
header = b.model_id
if b.token_type:
header += f" [{b.token_type}]"
self.console.print(f" {header:40s} {bar} {pct_str}")
self.console.print()
def _handle_personality_command(self, cmd: str):
"""Handle the /personality command to set predefined personalities."""
parts = cmd.split(maxsplit=1)
@ -5537,6 +5583,8 @@ class HermesCLI:
self._handle_model_switch(cmd_original)
elif canonical == "provider":
self._show_model_and_providers()
elif canonical == "gquota":
self._handle_gquota_command(cmd_original)
elif canonical == "personality":
# Use original case (handler lowercases the personality name itself)
@ -7519,7 +7567,15 @@ class HermesCLI:
self._invalidate()
def _get_approval_display_fragments(self):
"""Render the dangerous-command approval panel for the prompt_toolkit UI."""
"""Render the dangerous-command approval panel for the prompt_toolkit UI.
Layout priority: title + command + choices must always render, even if
the terminal is short or the description is long. Description is placed
at the bottom of the panel and gets truncated to fit the remaining row
budget. This prevents HSplit from clipping approve/deny off-screen when
tirith findings produce multi-paragraph descriptions or when the user
runs in a compact terminal pane.
"""
state = self._approval_state
if not state:
return []
@ -7578,22 +7634,89 @@ class HermesCLI:
box_width = _panel_box_width(title, preview_lines)
inner_text_width = max(8, box_width - 2)
# Pre-wrap the mandatory content — command + choices must always render.
cmd_wrapped = _wrap_panel_text(cmd_display, inner_text_width)
# (choice_index, wrapped_line) so we can re-apply selected styling below
choice_wrapped: list[tuple[int, str]] = []
for i, choice in enumerate(choices):
label = choice_labels.get(choice, choice)
prefix = ' ' if i == selected else ' '
for wrapped in _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" "):
choice_wrapped.append((i, wrapped))
# Budget vertical space so HSplit never clips the command or choices.
# Panel chrome (full layout with separators):
# top border + title + blank_after_title
# + blank_between_cmd_choices + bottom border = 5 rows.
# In tight terminals we collapse to:
# top border + title + bottom border = 3 rows (no blanks).
#
# reserved_below: rows consumed below the approval panel by the
# spinner/tool-progress line, status bar, input area, separators, and
# prompt symbol. Measured at ~6 rows during live PTY approval prompts;
# budget 6 so we don't overestimate the panel's room.
term_rows = shutil.get_terminal_size((100, 24)).lines
chrome_full = 5
chrome_tight = 3
reserved_below = 6
available = max(0, term_rows - reserved_below)
mandatory_full = chrome_full + len(cmd_wrapped) + len(choice_wrapped)
# If the full-chrome panel doesn't fit, drop the separator blanks.
# This keeps the command and every choice on-screen in compact terminals.
use_compact_chrome = mandatory_full > available
chrome_rows = chrome_tight if use_compact_chrome else chrome_full
# If the command itself is too long to leave room for choices (e.g. user
# hit "view" on a multi-hundred-character command), truncate it so the
# approve/deny buttons still render. Keep at least 1 row of command.
max_cmd_rows = max(1, available - chrome_rows - len(choice_wrapped))
if len(cmd_wrapped) > max_cmd_rows:
keep = max(1, max_cmd_rows - 1) if max_cmd_rows > 1 else 1
cmd_wrapped = cmd_wrapped[:keep] + ["… (command truncated — use /logs or /debug for full text)"]
# Allocate any remaining rows to description. The extra -1 in full mode
# accounts for the blank separator between choices and description.
mandatory_no_desc = chrome_rows + len(cmd_wrapped) + len(choice_wrapped)
desc_sep_cost = 0 if use_compact_chrome else 1
available_for_desc = available - mandatory_no_desc - desc_sep_cost
# Even on huge terminals, cap description height so the panel stays compact.
available_for_desc = max(0, min(available_for_desc, 10))
desc_wrapped = _wrap_panel_text(description, inner_text_width) if description else []
if available_for_desc < 1 or not desc_wrapped:
desc_wrapped = []
elif len(desc_wrapped) > available_for_desc:
keep = max(1, available_for_desc - 1)
desc_wrapped = desc_wrapped[:keep] + ["… (description truncated)"]
# Render: title → command → choices → description (description last so
# any remaining overflow clips from the bottom of the least-critical
# content, never from the command or choices). Use compact chrome (no
# blank separators) when the terminal is tight.
lines = []
lines.append(('class:approval-border', '' + ('' * box_width) + '\n'))
_append_panel_line(lines, 'class:approval-border', 'class:approval-title', title, box_width)
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for wrapped in _wrap_panel_text(description, inner_text_width):
_append_panel_line(lines, 'class:approval-border', 'class:approval-desc', wrapped, box_width)
for wrapped in _wrap_panel_text(cmd_display, inner_text_width):
if not use_compact_chrome:
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for wrapped in cmd_wrapped:
_append_panel_line(lines, 'class:approval-border', 'class:approval-cmd', wrapped, box_width)
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for i, choice in enumerate(choices):
label = choice_labels.get(choice, choice)
if not use_compact_chrome:
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for i, wrapped in choice_wrapped:
style = 'class:approval-selected' if i == selected else 'class:approval-choice'
prefix = ' ' if i == selected else ' '
for wrapped in _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" "):
_append_panel_line(lines, 'class:approval-border', style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:approval-border', box_width)
_append_panel_line(lines, 'class:approval-border', style, wrapped, box_width)
if desc_wrapped:
if not use_compact_chrome:
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for wrapped in desc_wrapped:
_append_panel_line(lines, 'class:approval-border', 'class:approval-desc', wrapped, box_width)
lines.append(('class:approval-border', '' + ('' * box_width) + '\n'))
return lines
@ -9245,7 +9368,13 @@ class HermesCLI:
lines.append((border_style, "" + (" " * box_width) + "\n"))
def _get_clarify_display():
"""Build styled text for the clarify question/choices panel."""
"""Build styled text for the clarify question/choices panel.
Layout priority: choices + Other option must always render even if
the question is very long. The question is budgeted to leave enough
rows for the choices and trailing chrome; anything over the budget
is truncated with a marker.
"""
state = cli_ref._clarify_state
if not state:
return []
@ -9266,48 +9395,97 @@ class HermesCLI:
box_width = _panel_box_width("Hermes needs your input", preview_lines)
inner_text_width = max(8, box_width - 2)
# Pre-wrap choices + Other option — these are mandatory.
choice_wrapped: list[tuple[int, str]] = []
if choices:
for i, choice in enumerate(choices):
prefix = ' ' if i == selected and not cli_ref._clarify_freetext else ' '
for wrapped in _wrap_panel_text(f"{prefix}{choice}", inner_text_width, subsequent_indent=" "):
choice_wrapped.append((i, wrapped))
# Trailing Other row(s)
other_idx = len(choices)
if selected == other_idx and not cli_ref._clarify_freetext:
other_label_mand = ' Other (type your answer)'
elif cli_ref._clarify_freetext:
other_label_mand = ' Other (type below)'
else:
other_label_mand = ' Other (type your answer)'
other_wrapped = _wrap_panel_text(other_label_mand, inner_text_width, subsequent_indent=" ")
elif cli_ref._clarify_freetext:
# Freetext-only mode: the guidance line takes the place of choices.
other_wrapped = _wrap_panel_text(
"Type your answer in the prompt below, then press Enter.",
inner_text_width,
)
else:
other_wrapped = []
# Budget the question so mandatory rows always render.
# Chrome layouts:
# full : top border + blank_after_title + blank_after_question
# + blank_before_bottom + bottom border = 5 rows
# tight: top border + bottom border = 2 rows (drop all blanks)
#
# reserved_below matches the approval-panel budget (~6 rows for
# spinner/tool-progress + status + input + separators + prompt).
term_rows = shutil.get_terminal_size((100, 24)).lines
chrome_full = 5
chrome_tight = 2
reserved_below = 6
available = max(0, term_rows - reserved_below)
mandatory_full = chrome_full + len(choice_wrapped) + len(other_wrapped)
use_compact_chrome = mandatory_full > available
chrome_rows = chrome_tight if use_compact_chrome else chrome_full
max_question_rows = max(1, available - chrome_rows - len(choice_wrapped) - len(other_wrapped))
max_question_rows = min(max_question_rows, 12) # soft cap on huge terminals
question_wrapped = _wrap_panel_text(question, inner_text_width)
if len(question_wrapped) > max_question_rows:
keep = max(1, max_question_rows - 1)
question_wrapped = question_wrapped[:keep] + ["… (question truncated)"]
lines = []
# Box top border
lines.append(('class:clarify-border', '╭─ '))
lines.append(('class:clarify-title', 'Hermes needs your input'))
lines.append(('class:clarify-border', ' ' + ('' * max(0, box_width - len("Hermes needs your input") - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if not use_compact_chrome:
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
# Question text
for wrapped in _wrap_panel_text(question, inner_text_width):
# Question text (bounded)
for wrapped in question_wrapped:
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if not use_compact_chrome:
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if cli_ref._clarify_freetext and not choices:
guidance = "Type your answer in the prompt below, then press Enter."
for wrapped in _wrap_panel_text(guidance, inner_text_width):
for wrapped in other_wrapped:
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if not use_compact_chrome:
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if choices:
# Multiple-choice mode: show selectable options
for i, choice in enumerate(choices):
for i, wrapped in choice_wrapped:
style = 'class:clarify-selected' if i == selected and not cli_ref._clarify_freetext else 'class:clarify-choice'
prefix = ' ' if i == selected and not cli_ref._clarify_freetext else ' '
wrapped_lines = _wrap_panel_text(f"{prefix}{choice}", inner_text_width, subsequent_indent=" ")
for wrapped in wrapped_lines:
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
# "Other" option (5th line, only shown when choices exist)
# "Other" option (trailing row(s), only shown when choices exist)
other_idx = len(choices)
if selected == other_idx and not cli_ref._clarify_freetext:
other_style = 'class:clarify-selected'
other_label = ' Other (type your answer)'
elif cli_ref._clarify_freetext:
other_style = 'class:clarify-active-other'
other_label = ' Other (type below)'
else:
other_style = 'class:clarify-choice'
other_label = ' Other (type your answer)'
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
for wrapped in other_wrapped:
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if not use_compact_chrome:
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
lines.append(('class:clarify-border', '' + ('' * box_width) + '\n'))
return lines

View file

@ -1590,6 +1590,8 @@ class BasePlatformAdapter(ABC):
"reset",
"background",
"restart",
"queue",
"q",
):
logger.debug(
"[%s] Command '/%s' bypassing active-session guard for %s",

View file

@ -235,6 +235,7 @@ class VoiceReceiver:
# Calculate dynamic RTP header size (RFC 9335 / rtpsize mode)
cc = first_byte & 0x0F # CSRC count
has_extension = bool(first_byte & 0x10) # extension bit
has_padding = bool(first_byte & 0x20) # padding bit (RFC 3550 §5.1)
header_size = 12 + (4 * cc) + (4 if has_extension else 0)
if len(data) < header_size + 4: # need at least header + nonce
@ -278,6 +279,31 @@ class VoiceReceiver:
if ext_data_len and len(decrypted) > ext_data_len:
decrypted = decrypted[ext_data_len:]
# --- Strip RTP padding (RFC 3550 §5.1) ---
# When the P bit is set, the last payload byte holds the count of
# trailing padding bytes (including itself) that must be removed
# before further processing. Skipping this passes padding-contaminated
# bytes into DAVE/Opus and corrupts inbound audio.
if has_padding:
if not decrypted:
if self._packet_debug_count <= 10:
logger.warning(
"RTP padding bit set but no payload (ssrc=%d)", ssrc,
)
return
pad_len = decrypted[-1]
if pad_len == 0 or pad_len > len(decrypted):
if self._packet_debug_count <= 10:
logger.warning(
"Invalid RTP padding length %d for payload size %d (ssrc=%d)",
pad_len, len(decrypted), ssrc,
)
return
decrypted = decrypted[:-pad_len]
if not decrypted:
# Padding consumed entire payload — nothing to decode
return
# --- DAVE E2EE decrypt ---
if self._dave_session:
with self._lock:

View file

@ -78,6 +78,10 @@ QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
QWEN_OAUTH_TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token"
QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
# Google Gemini OAuth (google-gemini-cli provider, Cloud Code Assist backend)
DEFAULT_GEMINI_CLOUDCODE_BASE_URL = "cloudcode-pa://google"
GEMINI_OAUTH_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 60 # refresh 60s before expiry
# =============================================================================
# Provider Registry
@ -122,6 +126,12 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
auth_type="oauth_external",
inference_base_url=DEFAULT_QWEN_BASE_URL,
),
"google-gemini-cli": ProviderConfig(
id="google-gemini-cli",
name="Google Gemini (OAuth)",
auth_type="oauth_external",
inference_base_url=DEFAULT_GEMINI_CLOUDCODE_BASE_URL,
),
"copilot": ProviderConfig(
id="copilot",
name="GitHub Copilot",
@ -939,7 +949,7 @@ def resolve_provider(
"github-copilot-acp": "copilot-acp", "copilot-acp-agent": "copilot-acp",
"aigateway": "ai-gateway", "vercel": "ai-gateway", "vercel-ai-gateway": "ai-gateway",
"opencode": "opencode-zen", "zen": "opencode-zen",
"qwen-portal": "qwen-oauth", "qwen-cli": "qwen-oauth", "qwen-oauth": "qwen-oauth",
"qwen-portal": "qwen-oauth", "qwen-cli": "qwen-oauth", "qwen-oauth": "qwen-oauth", "google-gemini-cli": "google-gemini-cli", "gemini-cli": "google-gemini-cli", "gemini-oauth": "google-gemini-cli",
"hf": "huggingface", "hugging-face": "huggingface", "huggingface-hub": "huggingface",
"mimo": "xiaomi", "xiaomi-mimo": "xiaomi",
"aws": "bedrock", "aws-bedrock": "bedrock", "amazon-bedrock": "bedrock", "amazon": "bedrock",
@ -1251,6 +1261,83 @@ def get_qwen_auth_status() -> Dict[str, Any]:
}
# =============================================================================
# Google Gemini OAuth (google-gemini-cli) — PKCE flow + Cloud Code Assist.
#
# Tokens live in ~/.hermes/auth/google_oauth.json (managed by agent.google_oauth).
# The `base_url` here is the marker "cloudcode-pa://google" that run_agent.py
# uses to construct a GeminiCloudCodeClient instead of the default OpenAI SDK.
# Actual HTTP traffic goes to https://cloudcode-pa.googleapis.com/v1internal:*.
# =============================================================================
def resolve_gemini_oauth_runtime_credentials(
*,
force_refresh: bool = False,
) -> Dict[str, Any]:
"""Resolve runtime OAuth creds for google-gemini-cli."""
try:
from agent.google_oauth import (
GoogleOAuthError,
_credentials_path,
get_valid_access_token,
load_credentials,
)
except ImportError as exc:
raise AuthError(
f"agent.google_oauth is not importable: {exc}",
provider="google-gemini-cli",
code="google_oauth_module_missing",
) from exc
try:
access_token = get_valid_access_token(force_refresh=force_refresh)
except GoogleOAuthError as exc:
raise AuthError(
str(exc),
provider="google-gemini-cli",
code=exc.code,
) from exc
creds = load_credentials()
base_url = DEFAULT_GEMINI_CLOUDCODE_BASE_URL
return {
"provider": "google-gemini-cli",
"base_url": base_url,
"api_key": access_token,
"source": "google-oauth",
"expires_at_ms": (creds.expires_ms if creds else None),
"auth_file": str(_credentials_path()),
"email": (creds.email if creds else "") or "",
"project_id": (creds.project_id if creds else "") or "",
}
def get_gemini_oauth_auth_status() -> Dict[str, Any]:
"""Return a status dict for `hermes auth list` / `hermes status`."""
try:
from agent.google_oauth import _credentials_path, load_credentials
except ImportError:
return {"logged_in": False, "error": "agent.google_oauth unavailable"}
auth_path = _credentials_path()
creds = load_credentials()
if creds is None or not creds.access_token:
return {
"logged_in": False,
"auth_file": str(auth_path),
"error": "not logged in",
}
return {
"logged_in": True,
"auth_file": str(auth_path),
"source": "google-oauth",
"api_key": creds.access_token,
"expires_at_ms": creds.expires_ms,
"email": creds.email,
"project_id": creds.project_id,
}
# =============================================================================
# SSH / remote session detection
# =============================================================================
@ -2469,6 +2556,8 @@ def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
return get_codex_auth_status()
if target == "qwen-oauth":
return get_qwen_auth_status()
if target == "google-gemini-cli":
return get_gemini_oauth_auth_status()
if target == "copilot-acp":
return get_external_process_provider_status(target)
# API-key providers

View file

@ -33,7 +33,7 @@ from hermes_constants import OPENROUTER_BASE_URL
# Providers that support OAuth login in addition to API keys.
_OAUTH_CAPABLE_PROVIDERS = {"anthropic", "nous", "openai-codex", "qwen-oauth"}
_OAUTH_CAPABLE_PROVIDERS = {"anthropic", "nous", "openai-codex", "qwen-oauth", "google-gemini-cli"}
def _get_custom_provider_names() -> list:
@ -148,7 +148,7 @@ def auth_add_command(args) -> None:
if provider.startswith(CUSTOM_POOL_PREFIX):
requested_type = AUTH_TYPE_API_KEY
else:
requested_type = AUTH_TYPE_OAUTH if provider in {"anthropic", "nous", "openai-codex", "qwen-oauth"} else AUTH_TYPE_API_KEY
requested_type = AUTH_TYPE_OAUTH if provider in {"anthropic", "nous", "openai-codex", "qwen-oauth", "google-gemini-cli"} else AUTH_TYPE_API_KEY
pool = load_pool(provider)
@ -254,6 +254,27 @@ def auth_add_command(args) -> None:
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "google-gemini-cli":
from agent.google_oauth import run_gemini_oauth_login_pure
creds = run_gemini_oauth_login_pure()
label = (getattr(args, "label", None) or "").strip() or (
creds.get("email") or _oauth_default_label(provider, len(pool.entries()) + 1)
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type=AUTH_TYPE_OAUTH,
priority=0,
source=f"{SOURCE_MANUAL}:google_pkce",
access_token=creds["access_token"],
refresh_token=creds.get("refresh_token"),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "qwen-oauth":
creds = auth_mod.resolve_qwen_runtime_credentials(refresh_if_expiring=False)
label = (getattr(args, "label", None) or "").strip() or label_from_token(

View file

@ -104,6 +104,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
CommandDef("model", "Switch model for this session", "Configuration", args_hint="[model] [--provider name] [--global]"),
CommandDef("provider", "Show available providers and current provider",
"Configuration"),
CommandDef("gquota", "Show Google Gemini Code Assist quota usage", "Info"),
CommandDef("personality", "Set a predefined personality", "Configuration",
args_hint="[name]"),

View file

@ -1002,6 +1002,30 @@ OPTIONAL_ENV_VARS = {
"category": "provider",
"advanced": True,
},
"HERMES_GEMINI_CLIENT_ID": {
"description": "Google OAuth client ID for google-gemini-cli (optional; defaults to Google's public gemini-cli client)",
"prompt": "Google OAuth client ID (optional — leave empty to use the public default)",
"url": "https://console.cloud.google.com/apis/credentials",
"password": False,
"category": "provider",
"advanced": True,
},
"HERMES_GEMINI_CLIENT_SECRET": {
"description": "Google OAuth client secret for google-gemini-cli (optional)",
"prompt": "Google OAuth client secret (optional)",
"url": "https://console.cloud.google.com/apis/credentials",
"password": True,
"category": "provider",
"advanced": True,
},
"HERMES_GEMINI_PROJECT_ID": {
"description": "GCP project ID for paid Gemini tiers (free tier auto-provisions)",
"prompt": "GCP project ID for Gemini OAuth (leave empty for free tier)",
"url": None,
"password": False,
"category": "provider",
"advanced": True,
},
"OPENCODE_ZEN_API_KEY": {
"description": "OpenCode Zen API key (pay-as-you-go access to curated models)",
"prompt": "OpenCode Zen API key",

View file

@ -373,7 +373,11 @@ def run_doctor(args):
print(color("◆ Auth Providers", Colors.CYAN, Colors.BOLD))
try:
from hermes_cli.auth import get_nous_auth_status, get_codex_auth_status
from hermes_cli.auth import (
get_nous_auth_status,
get_codex_auth_status,
get_gemini_oauth_auth_status,
)
nous_status = get_nous_auth_status()
if nous_status.get("logged_in"):
@ -388,6 +392,20 @@ def run_doctor(args):
check_warn("OpenAI Codex auth", "(not logged in)")
if codex_status.get("error"):
check_info(codex_status["error"])
gemini_status = get_gemini_oauth_auth_status()
if gemini_status.get("logged_in"):
email = gemini_status.get("email") or ""
project = gemini_status.get("project_id") or ""
pieces = []
if email:
pieces.append(email)
if project:
pieces.append(f"project={project}")
suffix = f" ({', '.join(pieces)})" if pieces else ""
check_ok("Google Gemini OAuth", f"(logged in{suffix})")
else:
check_warn("Google Gemini OAuth", "(not logged in)")
except Exception as e:
check_warn("Auth provider status", f"(could not check: {e})")

View file

@ -1405,6 +1405,8 @@ def select_provider_and_model(args=None):
_model_flow_openai_codex(config, current_model)
elif selected_provider == "qwen-oauth":
_model_flow_qwen_oauth(config, current_model)
elif selected_provider == "google-gemini-cli":
_model_flow_google_gemini_cli(config, current_model)
elif selected_provider == "copilot-acp":
_model_flow_copilot_acp(config, current_model)
elif selected_provider == "copilot":
@ -1807,6 +1809,76 @@ def _model_flow_qwen_oauth(_config, current_model=""):
print("No change.")
def _model_flow_google_gemini_cli(_config, current_model=""):
"""Google Gemini OAuth (PKCE) via Cloud Code Assist — supports free AND paid tiers.
Flow:
1. Show upfront warning about Google's ToS stance (per opencode-gemini-auth).
2. If creds missing, run PKCE browser OAuth via agent.google_oauth.
3. Resolve project context (env -> config -> auto-discover -> free tier).
4. Prompt user to pick a model.
5. Save to ~/.hermes/config.yaml.
"""
from hermes_cli.auth import (
DEFAULT_GEMINI_CLOUDCODE_BASE_URL,
get_gemini_oauth_auth_status,
resolve_gemini_oauth_runtime_credentials,
_prompt_model_selection,
_save_model_choice,
_update_config_for_provider,
)
from hermes_cli.models import _PROVIDER_MODELS
print()
print("⚠ Google considers using the Gemini CLI OAuth client with third-party")
print(" software a policy violation. Some users have reported account")
print(" restrictions. You can use your own API key via 'gemini' provider")
print(" for the lowest-risk experience.")
print()
try:
proceed = input("Continue with OAuth login? [y/N]: ").strip().lower()
except (EOFError, KeyboardInterrupt):
print("Cancelled.")
return
if proceed not in {"y", "yes"}:
print("Cancelled.")
return
status = get_gemini_oauth_auth_status()
if not status.get("logged_in"):
try:
from agent.google_oauth import resolve_project_id_from_env, start_oauth_flow
env_project = resolve_project_id_from_env()
start_oauth_flow(force_relogin=True, project_id=env_project)
except Exception as exc:
print(f"OAuth login failed: {exc}")
return
# Verify creds resolve + trigger project discovery
try:
creds = resolve_gemini_oauth_runtime_credentials(force_refresh=False)
project_id = creds.get("project_id", "")
if project_id:
print(f" Using GCP project: {project_id}")
else:
print(" No GCP project configured — free tier will be auto-provisioned on first request.")
except Exception as exc:
print(f"Failed to resolve Gemini credentials: {exc}")
return
models = list(_PROVIDER_MODELS.get("google-gemini-cli") or [])
default = current_model or (models[0] if models else "gemini-2.5-flash")
selected = _prompt_model_selection(models, current_model=default)
if selected:
_save_model_choice(selected)
_update_config_for_provider("google-gemini-cli", DEFAULT_GEMINI_CLOUDCODE_BASE_URL)
print(f"Default model set to: {selected} (via Google Gemini OAuth / Code Assist)")
else:
print("No change.")
def _model_flow_custom(config):
"""Custom endpoint: collect URL, API key, and model name.

View file

@ -136,6 +136,11 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"gemma-4-31b-it",
"gemma-4-26b-it",
],
"google-gemini-cli": [
"gemini-2.5-pro",
"gemini-2.5-flash",
"gemini-2.5-flash-lite",
],
"zai": [
"glm-5.1",
"glm-5",
@ -244,6 +249,7 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"big-pickle",
],
"opencode-go": [
"glm-5.1",
"glm-5",
"kimi-k2.5",
"mimo-v2-pro",
@ -534,6 +540,7 @@ CANONICAL_PROVIDERS: list[ProviderEntry] = [
ProviderEntry("copilot-acp", "GitHub Copilot ACP", "GitHub Copilot ACP (spawns `copilot --acp --stdio`)"),
ProviderEntry("huggingface", "Hugging Face", "Hugging Face Inference Providers (20+ open models)"),
ProviderEntry("gemini", "Google AI Studio", "Google AI Studio (Gemini models — OpenAI-compatible endpoint)"),
ProviderEntry("google-gemini-cli", "Google Gemini (OAuth)", "Google Gemini via OAuth + Code Assist (free tier supported; no API key needed)"),
ProviderEntry("deepseek", "DeepSeek", "DeepSeek (DeepSeek-V3, R1, coder — direct API)"),
ProviderEntry("xai", "xAI", "xAI (Grok models — direct API)"),
ProviderEntry("zai", "Z.AI / GLM", "Z.AI / GLM (Zhipu AI direct API)"),
@ -596,6 +603,8 @@ _PROVIDER_ALIASES = {
"qwen": "alibaba",
"alibaba-cloud": "alibaba",
"qwen-portal": "qwen-oauth",
"gemini-cli": "google-gemini-cli",
"gemini-oauth": "google-gemini-cli",
"hf": "huggingface",
"hugging-face": "huggingface",
"huggingface-hub": "huggingface",

View file

@ -64,6 +64,11 @@ HERMES_OVERLAYS: Dict[str, HermesOverlay] = {
base_url_override="https://portal.qwen.ai/v1",
base_url_env_var="HERMES_QWEN_BASE_URL",
),
"google-gemini-cli": HermesOverlay(
transport="openai_chat",
auth_type="oauth_external",
base_url_override="cloudcode-pa://google",
),
"copilot-acp": HermesOverlay(
transport="codex_responses",
auth_type="external_process",
@ -232,6 +237,11 @@ ALIASES: Dict[str, str] = {
"qwen": "alibaba",
"alibaba-cloud": "alibaba",
# google-gemini-cli (OAuth + Code Assist)
"gemini-cli": "google-gemini-cli",
"gemini-oauth": "google-gemini-cli",
# huggingface
"hf": "huggingface",
"hugging-face": "huggingface",

View file

@ -22,6 +22,7 @@ from hermes_cli.auth import (
resolve_nous_runtime_credentials,
resolve_codex_runtime_credentials,
resolve_qwen_runtime_credentials,
resolve_gemini_oauth_runtime_credentials,
resolve_api_key_provider_credentials,
resolve_external_process_provider_credentials,
has_usable_secret,
@ -156,6 +157,9 @@ def _resolve_runtime_from_pool_entry(
elif provider == "qwen-oauth":
api_mode = "chat_completions"
base_url = base_url or DEFAULT_QWEN_BASE_URL
elif provider == "google-gemini-cli":
api_mode = "chat_completions"
base_url = base_url or "cloudcode-pa://google"
elif provider == "anthropic":
api_mode = "anthropic_messages"
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
@ -804,6 +808,26 @@ def resolve_runtime_provider(
logger.info("Qwen OAuth credentials failed; "
"falling through to next provider.")
if provider == "google-gemini-cli":
try:
creds = resolve_gemini_oauth_runtime_credentials()
return {
"provider": "google-gemini-cli",
"api_mode": "chat_completions",
"base_url": creds.get("base_url", ""),
"api_key": creds.get("api_key", ""),
"source": creds.get("source", "google-oauth"),
"expires_at_ms": creds.get("expires_at_ms"),
"email": creds.get("email", ""),
"project_id": creds.get("project_id", ""),
"requested_provider": requested_provider,
}
except AuthError:
if requested_provider != "auto":
raise
logger.info("Google Gemini OAuth credentials failed; "
"falling through to next provider.")
if provider == "copilot-acp":
creds = resolve_external_process_provider_credentials(provider)
return {

View file

@ -102,7 +102,7 @@ _DEFAULT_PROVIDER_MODELS = {
"ai-gateway": ["anthropic/claude-opus-4.6", "anthropic/claude-sonnet-4.6", "openai/gpt-5", "google/gemini-3-flash"],
"kilocode": ["anthropic/claude-opus-4.6", "anthropic/claude-sonnet-4.6", "openai/gpt-5.4", "google/gemini-3-pro-preview", "google/gemini-3-flash-preview"],
"opencode-zen": ["gpt-5.4", "gpt-5.3-codex", "claude-sonnet-4-6", "gemini-3-flash", "glm-5", "kimi-k2.5", "minimax-m2.7"],
"opencode-go": ["glm-5", "kimi-k2.5", "mimo-v2-pro", "mimo-v2-omni", "minimax-m2.5", "minimax-m2.7"],
"opencode-go": ["glm-5.1", "glm-5", "kimi-k2.5", "mimo-v2-pro", "mimo-v2-omni", "minimax-m2.5", "minimax-m2.7"],
"huggingface": [
"Qwen/Qwen3.5-397B-A17B", "Qwen/Qwen3-235B-A22B-Thinking-2507",
"Qwen/Qwen3-Coder-480B-A35B-Instruct", "deepseek-ai/DeepSeek-R1-0528",

View file

@ -467,6 +467,7 @@ async def get_status():
"latest_config_version": latest_ver,
"gateway_running": gateway_running,
"gateway_pid": gateway_pid,
"gateway_health_url": _GATEWAY_HEALTH_URL,
"gateway_state": gateway_state,
"gateway_platforms": gateway_platforms,
"gateway_exit_reason": gateway_exit_reason,

View file

@ -4365,6 +4365,22 @@ class AIAgent:
self._client_log_context(),
)
return client
if self.provider == "google-gemini-cli" or str(client_kwargs.get("base_url", "")).startswith("cloudcode-pa://"):
from agent.gemini_cloudcode_adapter import GeminiCloudCodeClient
# Strip OpenAI-specific kwargs the Gemini client doesn't accept
safe_kwargs = {
k: v for k, v in client_kwargs.items()
if k in {"api_key", "base_url", "default_headers", "project_id", "timeout"}
}
client = GeminiCloudCodeClient(**safe_kwargs)
logger.info(
"Gemini Cloud Code Assist client created (%s, shared=%s) %s",
reason,
shared,
self._client_log_context(),
)
return client
client = OpenAI(**client_kwargs)
logger.info(
"OpenAI client created (%s, shared=%s) %s",

View file

@ -227,6 +227,7 @@ AUTHOR_MAP = {
"zzn+pa@zzn.im": "xinbenlv",
"zaynjarvis@gmail.com": "ZaynJarvis",
"zhiheng.liu@bytedance.com": "ZaynJarvis",
"mbelleau@Michels-MacBook-Pro.local": "malaiwah",
}

File diff suppressed because it is too large Load diff

View file

@ -141,3 +141,116 @@ class TestCliApprovalUi:
assert "archive-" in rendered
assert "keyring.gpg" in rendered
assert "status=progress" in rendered
def test_approval_display_preserves_command_and_choices_with_long_description(self):
"""Regression: long tirith descriptions used to push approve/deny off-screen.
The panel must always render the command and every choice, even when
the description would otherwise wrap into 10+ lines. The description
gets truncated with a marker instead.
"""
cli = _make_cli_stub()
long_desc = (
"Security scan — [CRITICAL] Destructive shell command with wildcard expansion: "
"The command performs a recursive deletion of log files which may contain "
"audit information relevant to active incident investigations, running services "
"that rely on log files for state, rotated archives, and other system artifacts. "
"Review whether this is intended before approving. Consider whether a targeted "
"deletion with more specific filters would better match the intent."
)
cli._approval_state = {
"command": "rm -rf /var/log/apache2/*.log",
"description": long_desc,
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": queue.Queue(),
}
# Simulate a compact terminal where the old unbounded panel would overflow.
import shutil as _shutil
with patch("cli.shutil.get_terminal_size",
return_value=_shutil.os.terminal_size((100, 20))):
fragments = cli._get_approval_display_fragments()
rendered = "".join(text for _style, text in fragments)
# Command must be fully visible (rm -rf /var/log/apache2/*.log is short).
assert "rm -rf /var/log/apache2/*.log" in rendered
# Every choice must render — this is the core bug: approve/deny were
# getting clipped off the bottom of the panel.
assert "Allow once" in rendered
assert "Allow for this session" in rendered
assert "Add to permanent allowlist" in rendered
assert "Deny" in rendered
# The bottom border must render (i.e. the panel is self-contained).
assert rendered.rstrip().endswith("")
# The description gets truncated — marker should appear.
assert "(description truncated)" in rendered
def test_approval_display_skips_description_on_very_short_terminal(self):
"""On a 12-row terminal, only the command and choices have room.
The description is dropped entirely rather than partially shown, so the
choices never get clipped.
"""
cli = _make_cli_stub()
cli._approval_state = {
"command": "rm -rf /var/log/apache2/*.log",
"description": "recursive delete",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": queue.Queue(),
}
import shutil as _shutil
with patch("cli.shutil.get_terminal_size",
return_value=_shutil.os.terminal_size((100, 12))):
fragments = cli._get_approval_display_fragments()
rendered = "".join(text for _style, text in fragments)
# Command visible.
assert "rm -rf /var/log/apache2/*.log" in rendered
# All four choices visible.
for label in ("Allow once", "Allow for this session",
"Add to permanent allowlist", "Deny"):
assert label in rendered, f"choice {label!r} missing"
def test_approval_display_truncates_giant_command_in_view_mode(self):
"""If the user hits /view on a massive command, choices still render.
The command gets truncated with a marker; the description gets dropped
if there's no remaining row budget.
"""
cli = _make_cli_stub()
# 50 lines of command when wrapped at ~64 chars.
giant_cmd = "bash -c 'echo " + ("x" * 3000) + "'"
cli._approval_state = {
"command": giant_cmd,
"description": "shell command via -c/-lc flag",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"show_full": True,
"response_queue": queue.Queue(),
}
import shutil as _shutil
with patch("cli.shutil.get_terminal_size",
return_value=_shutil.os.terminal_size((100, 24))):
fragments = cli._get_approval_display_fragments()
rendered = "".join(text for _style, text in fragments)
# All four choices visible even with a huge command.
for label in ("Allow once", "Allow for this session",
"Add to permanent allowlist", "Deny"):
assert label in rendered, f"choice {label!r} missing"
# Command got truncated with a marker.
assert "(command truncated" in rendered

View file

@ -200,6 +200,22 @@ class TestCommandBypassActiveSession:
"/background response was not sent back to the user"
)
@pytest.mark.asyncio
async def test_queue_bypasses_guard(self):
"""/queue must bypass so it can queue without interrupting."""
adapter = _make_adapter()
sk = _session_key()
adapter._active_sessions[sk] = asyncio.Event()
await adapter.handle_message(_make_event("/queue follow up"))
assert sk not in adapter._pending_messages, (
"/queue was queued as a pending message instead of being dispatched"
)
assert any("handled:queue" in r for r in adapter.sent_responses), (
"/queue response was not sent back to the user"
)
# ---------------------------------------------------------------------------
# Tests: non-bypass messages still get queued

View file

@ -370,6 +370,8 @@ class TestCopilotNormalization:
assert opencode_model_api_mode("opencode-zen", "minimax-m2.5") == "chat_completions"
def test_opencode_go_api_modes_match_docs(self):
assert opencode_model_api_mode("opencode-go", "glm-5.1") == "chat_completions"
assert opencode_model_api_mode("opencode-go", "opencode-go/glm-5.1") == "chat_completions"
assert opencode_model_api_mode("opencode-go", "glm-5") == "chat_completions"
assert opencode_model_api_mode("opencode-go", "opencode-go/glm-5") == "chat_completions"
assert opencode_model_api_mode("opencode-go", "kimi-k2.5") == "chat_completions"

View file

@ -15,7 +15,7 @@ def test_opencode_go_appears_when_api_key_set():
opencode_go = next((p for p in providers if p["slug"] == "opencode-go"), None)
assert opencode_go is not None, "opencode-go should appear when OPENCODE_GO_API_KEY is set"
assert opencode_go["models"] == ["glm-5", "kimi-k2.5", "mimo-v2-pro", "mimo-v2-omni", "minimax-m2.7", "minimax-m2.5"]
assert opencode_go["models"] == ["glm-5.1", "glm-5", "kimi-k2.5", "mimo-v2-pro", "mimo-v2-omni", "minimax-m2.7", "minimax-m2.5"]
# opencode-go can appear as "built-in" (from PROVIDER_TO_MODELS_DEV when
# models.dev is reachable) or "hermes" (from HERMES_OVERLAYS fallback when
# the API is unavailable, e.g. in CI).

View file

@ -15,9 +15,20 @@ def _args(**overrides):
return Namespace(**base)
def test_cmd_chat_tui_continue_uses_latest_tui_session(monkeypatch):
import hermes_cli.main as main_mod
@pytest.fixture
def main_mod(monkeypatch):
"""cmd_chat entry with the first-run provider-gate stubbed past.
`cmd_chat` now early-exits when no API key is configured (post-merge);
these tests exercise the post-config routing so we fake the check out.
"""
import hermes_cli.main as mod
monkeypatch.setattr(mod, "_has_any_provider_configured", lambda: True)
return mod
def test_cmd_chat_tui_continue_uses_latest_tui_session(monkeypatch, main_mod):
calls = []
captured = {}
@ -40,9 +51,7 @@ def test_cmd_chat_tui_continue_uses_latest_tui_session(monkeypatch):
assert captured["resume"] == "20260408_235959_a1b2c3"
def test_cmd_chat_tui_continue_falls_back_to_latest_cli_session(monkeypatch):
import hermes_cli.main as main_mod
def test_cmd_chat_tui_continue_falls_back_to_latest_cli_session(monkeypatch, main_mod):
calls = []
captured = {}
@ -69,9 +78,7 @@ def test_cmd_chat_tui_continue_falls_back_to_latest_cli_session(monkeypatch):
assert captured["resume"] == "20260408_235959_d4e5f6"
def test_cmd_chat_tui_resume_resolves_title_before_launch(monkeypatch):
import hermes_cli.main as main_mod
def test_cmd_chat_tui_resume_resolves_title_before_launch(monkeypatch, main_mod):
captured = {}
def fake_launch(resume_session_id=None, tui_dev=False):

View file

@ -1122,6 +1122,7 @@ class TestStatusRemoteGateway:
assert data["gateway_running"] is True
assert data["gateway_pid"] == 999
assert data["gateway_state"] == "running"
assert data["gateway_health_url"] == "http://gw:8642"
def test_status_remote_probe_not_attempted_when_local_pid_found(self, monkeypatch):
"""When local PID check succeeds, the remote probe is never called."""
@ -1158,6 +1159,7 @@ class TestStatusRemoteGateway:
assert resp.status_code == 200
data = resp.json()
assert data["gateway_running"] is False
assert data["gateway_health_url"] is None
def test_status_remote_running_null_pid(self, monkeypatch):
"""Remote gateway running but PID not in response — pid should be None."""

View file

@ -73,6 +73,50 @@ def _build_encrypted_rtp_packet(secret_key, opus_payload, ssrc=100, seq=1, times
return header + ciphertext + nonce_counter
def _build_padded_rtp_packet(
secret_key, opus_payload, pad_len, ssrc=100, seq=1, timestamp=960,
declared_pad_len=None, ext_words=0,
):
"""Build a NaCl-encrypted RTP packet with the P bit set and padding appended.
Per RFC 3550 §5.1, the last padding byte declares how many trailing bytes
(including itself) to discard. ``pad_len`` is the actual padding appended;
``declared_pad_len`` lets a test forge a mismatched declared length to
exercise the validation path. ``ext_words`` > 0 also sets the X bit and
prepends a synthetic extension block (4-byte preamble in cleartext header,
ext_words*4 bytes of encrypted extension data prepended to the payload).
"""
if pad_len < 1:
raise ValueError("pad_len must be >= 1 (last byte includes itself)")
declared = pad_len if declared_pad_len is None else declared_pad_len
if declared < 0 or declared > 255:
raise ValueError("declared_pad_len must fit in one byte")
has_extension = ext_words > 0
first_byte = 0xA0 | (0x10 if has_extension else 0) # V=2, P=1, [X=?], CC=0
fixed_header = struct.pack(">BBHII", first_byte, 0x78, seq, timestamp, ssrc)
if has_extension:
# 4-byte extension preamble: 2 bytes "defined by profile" + 2 bytes length-in-words
ext_preamble = struct.pack(">HH", 0xBEDE, ext_words)
header = fixed_header + ext_preamble
ext_data = b"\xab" * (ext_words * 4)
else:
header = fixed_header
ext_data = b""
padding = b"\x00" * (pad_len - 1) + bytes([declared])
plaintext = ext_data + opus_payload + padding
box = nacl.secret.Aead(secret_key)
nonce_counter = struct.pack(">I", seq)
full_nonce = nonce_counter + b"\x00" * 20
enc_msg = box.encrypt(plaintext, header, full_nonce)
ciphertext = enc_msg.ciphertext
return header + ciphertext + nonce_counter
def _make_voice_receiver(secret_key, dave_session=None, bot_ssrc=9999,
allowed_user_ids=None, members=None):
"""Create a VoiceReceiver with real secret key."""
@ -212,6 +256,113 @@ class TestRealNaClWithDAVE:
assert len(receiver._buffers.get(100, b"")) == 0
class TestRTPPaddingStrip:
"""RFC 3550 §5.1 — strip RTP padding before DAVE/Opus decode."""
def test_padded_packet_stripped_and_buffered(self):
"""P bit set → trailing padding stripped → opus payload decoded."""
key = _make_secret_key()
opus_silence = b"\xf8\xff\xfe"
receiver = _make_voice_receiver(key)
# 5 bytes of padding (4 zeros + count byte = 5)
packet = _build_padded_rtp_packet(key, opus_silence, pad_len=5, ssrc=100)
receiver._on_packet(packet)
assert 100 in receiver._buffers
assert len(receiver._buffers[100]) > 0
def test_padded_packet_matches_unpadded_output(self):
"""Same opus payload with/without padding → same decoded PCM."""
key = _make_secret_key()
opus_silence = b"\xf8\xff\xfe"
recv_plain = _make_voice_receiver(key)
recv_plain._on_packet(
_build_encrypted_rtp_packet(key, opus_silence, ssrc=100)
)
recv_padded = _make_voice_receiver(key)
recv_padded._on_packet(
_build_padded_rtp_packet(key, opus_silence, pad_len=7, ssrc=100)
)
assert bytes(recv_plain._buffers[100]) == bytes(recv_padded._buffers[100])
def test_padding_with_dave_passthrough(self):
"""Padding stripped before DAVE → passthrough buffers cleanly."""
key = _make_secret_key()
opus_silence = b"\xf8\xff\xfe"
dave = MagicMock() # SSRC unmapped → DAVE skipped, passthrough used
receiver = _make_voice_receiver(key, dave_session=dave)
packet = _build_padded_rtp_packet(key, opus_silence, pad_len=4, ssrc=100)
receiver._on_packet(packet)
dave.decrypt.assert_not_called()
assert 100 in receiver._buffers
assert len(receiver._buffers[100]) > 0
def test_invalid_padding_length_zero_dropped(self):
"""Declared pad_len=0 is invalid (RFC requires count includes itself)."""
key = _make_secret_key()
opus_silence = b"\xf8\xff\xfe"
receiver = _make_voice_receiver(key)
packet = _build_padded_rtp_packet(
key, opus_silence, pad_len=4, declared_pad_len=0, ssrc=100
)
receiver._on_packet(packet)
assert len(receiver._buffers.get(100, b"")) == 0
def test_invalid_padding_length_overflow_dropped(self):
"""Declared pad_len > payload size → packet dropped."""
key = _make_secret_key()
opus_silence = b"\xf8\xff\xfe"
receiver = _make_voice_receiver(key)
packet = _build_padded_rtp_packet(
key, opus_silence, pad_len=4, declared_pad_len=255, ssrc=100
)
receiver._on_packet(packet)
assert len(receiver._buffers.get(100, b"")) == 0
def test_padding_consuming_entire_payload_dropped(self):
"""Padding consumes entire payload → no opus data → dropped."""
key = _make_secret_key()
receiver = _make_voice_receiver(key)
# Empty opus payload, 6 bytes of padding (count byte declares 6)
packet = _build_padded_rtp_packet(key, b"", pad_len=6, ssrc=100)
receiver._on_packet(packet)
assert len(receiver._buffers.get(100, b"")) == 0
def test_padding_with_extension_stripped_correctly(self):
"""X+P bits both set → strip extension from start, padding from end."""
key = _make_secret_key()
opus_silence = b"\xf8\xff\xfe"
# Same opus payload sent two ways: plain, and with both ext+padding
recv_plain = _make_voice_receiver(key)
recv_plain._on_packet(
_build_encrypted_rtp_packet(key, opus_silence, ssrc=100)
)
recv_ext_pad = _make_voice_receiver(key)
recv_ext_pad._on_packet(
_build_padded_rtp_packet(
key, opus_silence, pad_len=5, ext_words=2, ssrc=100
)
)
# Both must yield identical decoded PCM — ext data and padding both
# stripped before opus decode.
assert bytes(recv_plain._buffers[100]) == bytes(recv_ext_pad._buffers[100])
class TestFullVoiceFlow:
"""End-to-end: encrypt → receive → buffer → silence detect → complete."""

View file

@ -0,0 +1,186 @@
"""Regression guardrail: sequential _create_openai_client calls must not
share a closed transport across invocations.
This is the behavioral twin of test_create_openai_client_kwargs_isolation.py.
That test pins "don't mutate input kwargs" at the syntactic level it catches
#10933 specifically because the bug mutated ``client_kwargs`` in place. This
test pins the user-visible invariant at the behavioral level: no matter HOW a
future keepalive / transport reimplementation plumbs sockets in, the Nth call
to ``_create_openai_client`` must not hand back a client wrapping a
now-closed httpx transport from an earlier call.
AlexKucera's Discord report (2026-04-16): after ``hermes update`` pulled
#10933, the first chat on a session worked, every subsequent chat failed
with ``APIConnectionError('Connection error.')`` whose cause was
``RuntimeError: Cannot send a request, as the client has been closed``.
That is the exact scenario this test reproduces at object level without a
network, so it runs in CI on every PR.
"""
from unittest.mock import MagicMock, patch
from run_agent import AIAgent
def _make_agent():
return AIAgent(
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
def _make_fake_openai_factory(constructed):
"""Return a fake ``OpenAI`` class that records every constructed instance
along with whatever ``http_client`` it was handed (or ``None`` if the
caller did not inject one).
The fake also forwards ``.close()`` calls down to the http_client if one
is present, mirroring what the real OpenAI SDK does during teardown and
what would expose the #10933 bug.
"""
class _FakeOpenAI:
def __init__(self, **kwargs):
self._kwargs = kwargs
self._http_client = kwargs.get("http_client")
self._closed = False
constructed.append(self)
def close(self):
self._closed = True
hc = self._http_client
if hc is not None and hasattr(hc, "close"):
try:
hc.close()
except Exception:
pass
return _FakeOpenAI
def test_second_create_does_not_wrap_closed_transport_from_first():
"""Back-to-back _create_openai_client calls on the same _client_kwargs
must not hand call N a closed http_client from call N-1.
The bug class: call 1 injects an httpx.Client into self._client_kwargs,
client 1 closes (SDK teardown), its http_client closes with it, call 2
reads the SAME now-closed http_client from self._client_kwargs and wraps
it. Every request through client 2 then fails.
"""
agent = _make_agent()
constructed: list = []
fake_openai = _make_fake_openai_factory(constructed)
# Seed a baseline kwargs dict resembling real runtime state.
agent._client_kwargs = {
"api_key": "test-key-value",
"base_url": "https://api.example.com/v1",
}
with patch("run_agent.OpenAI", fake_openai):
# Call 1 — what _replace_primary_openai_client does at init/rebuild.
client_a = agent._create_openai_client(
agent._client_kwargs, reason="initial", shared=True
)
# Simulate the SDK teardown that follows a rebuild: the old client's
# close() is invoked, which closes its underlying http_client if one
# was injected. This is exactly what _replace_primary_openai_client
# does via _close_openai_client after a successful rebuild.
client_a.close()
# Call 2 — the rebuild path. This is where #10933 crashed on the
# next real request.
client_b = agent._create_openai_client(
agent._client_kwargs, reason="rebuild", shared=True
)
assert len(constructed) == 2, f"expected 2 OpenAI constructions, got {len(constructed)}"
assert constructed[0] is client_a
assert constructed[1] is client_b
hc_a = constructed[0]._http_client
hc_b = constructed[1]._http_client
# If the implementation does not inject http_client at all, we're safely
# past the bug class — nothing to share, nothing to close. That's fine.
if hc_a is None and hc_b is None:
return
# If ANY http_client is injected, the two calls MUST NOT share the same
# object, because call 1's object was closed between calls.
if hc_a is not None and hc_b is not None:
assert hc_a is not hc_b, (
"Regression of #10933: _create_openai_client handed the same "
"http_client to two sequential constructions. After the first "
"client is closed (normal SDK teardown on rebuild), the second "
"wraps a closed transport and every subsequent chat raises "
"'Cannot send a request, as the client has been closed'."
)
# And whatever http_client the LATEST call handed out must not be closed
# already. This catches implementations that cache the injected client on
# ``self`` (under any attribute name) and rebuild the SDK client around
# it even after the previous SDK close closed the cached transport.
if hc_b is not None:
is_closed_attr = getattr(hc_b, "is_closed", None)
if is_closed_attr is not None:
assert not is_closed_attr, (
"Regression of #10933: second _create_openai_client returned "
"a client whose http_client is already closed. New chats on "
"this session will fail with 'Cannot send a request, as the "
"client has been closed'."
)
def test_replace_primary_openai_client_survives_repeated_rebuilds():
"""Full rebuild path: exercise _replace_primary_openai_client three times
back-to-back and confirm every resulting ``self.client`` is a fresh,
usable construction rather than a wrapper around a previously-closed
transport.
_replace_primary_openai_client is the real rebuild entrypoint it is
what runs on 401 credential refresh, pool rotation, and model switch.
If a future keepalive tweak stores state on ``self`` between calls,
this test is what notices.
"""
agent = _make_agent()
constructed: list = []
fake_openai = _make_fake_openai_factory(constructed)
agent._client_kwargs = {
"api_key": "test-key-value",
"base_url": "https://api.example.com/v1",
}
with patch("run_agent.OpenAI", fake_openai):
# Seed the initial client so _replace has something to tear down.
agent.client = agent._create_openai_client(
agent._client_kwargs, reason="seed", shared=True
)
# Three rebuilds in a row. Each one must install a fresh live client.
for label in ("rebuild_1", "rebuild_2", "rebuild_3"):
ok = agent._replace_primary_openai_client(reason=label)
assert ok, f"rebuild {label} returned False"
cur = agent.client
assert not cur._closed, (
f"after rebuild {label}, self.client is already closed — "
"this breaks the very next chat turn"
)
hc = cur._http_client
if hc is not None:
is_closed_attr = getattr(hc, "is_closed", None)
if is_closed_attr is not None:
assert not is_closed_attr, (
f"after rebuild {label}, self.client.http_client is "
"closed — reproduces #10933 (AlexKucera report, "
"Discord 2026-04-16)"
)
# All four constructions (seed + 3 rebuilds) should be distinct objects.
# If two are the same, the rebuild is cacheing the SDK client across
# teardown, which also reproduces the bug class.
assert len({id(c) for c in constructed}) == len(constructed), (
"Some _create_openai_client calls returned the same object across "
"a teardown — rebuild is not producing fresh clients"
)

View file

@ -0,0 +1,137 @@
"""Live regression guardrail for the keepalive/transport bug class (#10933).
AlexKucera reported on Discord (2026-04-16) that after ``hermes update`` pulled
#10933, the FIRST chat in a session worked and EVERY subsequent chat failed
with ``APIConnectionError('Connection error.')`` whose cause was
``RuntimeError: Cannot send a request, as the client has been closed``.
The companion ``test_create_openai_client_reuse.py`` pins this contract at
object level with mocked ``OpenAI``. This file runs the same shape of
reproduction against a real provider so we have a true end-to-end smoke test
for any future keepalive / transport plumbing.
Opt-in not part of default CI:
HERMES_LIVE_TESTS=1 pytest tests/run_agent/test_sequential_chats_live.py -v
Requires ``OPENROUTER_API_KEY`` to be set (or sourced via ~/.hermes/.env).
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
# Load ~/.hermes/.env so live runs pick up OPENROUTER_API_KEY without
# needing the runner to shell-source it first. Silent if the file is absent.
def _load_user_env() -> None:
env_file = Path.home() / ".hermes" / ".env"
if not env_file.exists():
return
for raw in env_file.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
k = k.strip()
v = v.strip().strip('"').strip("'")
# Don't clobber an already-set env var — lets the caller override.
os.environ.setdefault(k, v)
_load_user_env()
LIVE = os.environ.get("HERMES_LIVE_TESTS") == "1"
OR_KEY = os.environ.get("OPENROUTER_API_KEY", "")
pytestmark = [
pytest.mark.skipif(not LIVE, reason="live-only — set HERMES_LIVE_TESTS=1"),
pytest.mark.skipif(not OR_KEY, reason="OPENROUTER_API_KEY not configured"),
]
# Cheap, fast, tool-capable. Swap if it ever goes dark.
LIVE_MODEL = "google/gemini-2.5-flash"
def _make_live_agent():
from run_agent import AIAgent
return AIAgent(
model=LIVE_MODEL,
provider="openrouter",
api_key=OR_KEY,
base_url="https://openrouter.ai/api/v1",
max_iterations=3,
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
# All toolsets off so the agent just produces a single text reply
# per turn — we want to test the HTTP client lifecycle, not tools.
disabled_toolsets=["*"],
)
def _looks_like_error_reply(reply: str) -> tuple[bool, str]:
"""AIAgent returns an error-sentinel string (not an exception) when the
underlying API call fails past retries. A naive ``assert reply and
reply.strip()`` misses this because the sentinel is truthy. This
checker enumerates the known-bad shapes so the live test actually
catches #10933 instead of rubber-stamping the error response.
"""
lowered = reply.lower().strip()
bad_substrings = (
"api call failed",
"connection error",
"client has been closed",
"cannot send a request",
"max retries",
)
for marker in bad_substrings:
if marker in lowered:
return True, marker
return False, ""
def _assert_healthy_reply(reply, turn_label: str) -> None:
assert reply and reply.strip(), f"{turn_label} returned empty: {reply!r}"
is_err, marker = _looks_like_error_reply(reply)
assert not is_err, (
f"{turn_label} returned an error-sentinel string instead of a real "
f"model reply — matched marker {marker!r}. This is the exact shape "
f"of #10933 (AlexKucera Discord report, 2026-04-16): the agent's "
f"retry loop burned three attempts against a closed httpx transport "
f"and surfaced 'API call failed after 3 retries: Connection error.' "
f"to the user. Reply was: {reply!r}"
)
def test_three_sequential_chats_across_client_rebuild():
"""Reproduces AlexKucera's exact failure shape end-to-end.
Turn 1 always worked under #10933. Turn 2 was the one that failed
because the shared httpx transport had been torn down between turns.
Turn 3 is here as extra insurance against any lazy-init shape where
the failure only shows up on call N>=3.
We also deliberately trigger ``_replace_primary_openai_client`` between
turn 2 and turn 3 that is the real rebuild entrypoint (401 refresh,
credential rotation, model switch) and is the path that actually
stored the closed transport into ``self._client_kwargs`` in #10933.
"""
agent = _make_live_agent()
r1 = agent.chat("Respond with only the word: ONE")
_assert_healthy_reply(r1, "turn 1")
r2 = agent.chat("Respond with only the word: TWO")
_assert_healthy_reply(r2, "turn 2")
# Force a client rebuild through the real path — mimics 401 refresh /
# credential rotation / model switch lifecycle.
rebuilt = agent._replace_primary_openai_client(reason="regression_test_rebuild")
assert rebuilt, "rebuild via _replace_primary_openai_client returned False"
r3 = agent.chat("Respond with only the word: THREE")
_assert_healthy_reply(r3, "turn 3 (post-rebuild)")

View file

@ -231,6 +231,9 @@ def test_config_set_personality_resets_history_and_returns_info(monkeypatch):
monkeypatch.setattr(server, "_session_info", lambda agent: {"model": getattr(agent, "model", "?")})
monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None)
monkeypatch.setattr(server, "_emit", lambda *args: emits.append(args))
# _write_config_key writes to ~/.hermes/config.yaml — races with other
# xdist workers that touch the same file. Stub it out.
monkeypatch.setattr(server, "_write_config_key", lambda path, value: None)
resp = server.handle_request(
{"id": "1", "method": "config.set", "params": {"session_id": "sid", "key": "personality", "value": "helpful"}}

View file

@ -165,7 +165,7 @@ def test_session_resume_returns_hydrated_messages(server, monkeypatch):
def reopen_session(self, _sid):
return None
def get_messages(self, _sid):
def get_messages_as_conversation(self, _sid):
return [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "yo"},
@ -193,7 +193,7 @@ def test_session_resume_returns_hydrated_messages(server, monkeypatch):
assert resp["result"]["messages"] == [
{"role": "user", "text": "hello"},
{"role": "assistant", "text": "yo"},
{"role": "tool", "text": "searched"},
{"role": "tool", "name": "tool", "context": ""},
]

View file

@ -17,10 +17,10 @@ export function LanguageSwitcher() {
title={t.language.switchTo}
aria-label={t.language.switchTo}
>
{/* Show the *other* language's flag as the clickable target */}
<span className="text-base leading-none">{locale === "en" ? "🇨🇳" : "🇬🇧"}</span>
{/* Show the *current* language's flag — tooltip advertises the click action */}
<span className="text-base leading-none">{locale === "en" ? "🇬🇧" : "🇨🇳"}</span>
<span className="hidden sm:inline font-display tracking-wide uppercase text-[0.65rem]">
{locale === "en" ? "中文" : "EN"}
{locale === "en" ? "EN" : "中文"}
</span>
</button>
);

View file

@ -213,6 +213,7 @@ export interface StatusResponse {
config_version: number;
env_path: string;
gateway_exit_reason: string | null;
gateway_health_url: string | null;
gateway_pid: number | null;
gateway_platforms: Record<string, PlatformStatus>;
gateway_running: boolean;

View file

@ -53,6 +53,7 @@ export default function StatusPage() {
};
function gatewayValue(): string {
if (status!.gateway_running && status!.gateway_health_url) return status!.gateway_health_url;
if (status!.gateway_running && status!.gateway_pid) return `${t.status.pid} ${status!.gateway_pid}`;
if (status!.gateway_running) return t.status.runningRemote;
if (status!.gateway_state === "startup_failed") return t.status.startFailed;
@ -137,14 +138,14 @@ export default function StatusPage() {
<div className="grid gap-4 sm:grid-cols-3">
{items.map(({ icon: Icon, label, value, badgeText, badgeVariant }) => (
<Card key={label}>
<Card key={label} className="min-w-0 overflow-hidden">
<CardHeader className="flex flex-row items-center justify-between pb-2">
<CardTitle className="text-sm font-medium">{label}</CardTitle>
<Icon className="h-4 w-4 text-muted-foreground" />
</CardHeader>
<CardContent>
<div className="text-2xl font-bold font-display">{value}</div>
<div className="text-2xl font-bold font-display truncate" title={value}>{value}</div>
{badgeText && (
<Badge variant={badgeVariant} className="mt-2">

View file

@ -35,12 +35,99 @@ You need at least one way to connect to an LLM. Use `hermes model` to switch pro
| **DeepSeek** | `DEEPSEEK_API_KEY` in `~/.hermes/.env` (provider: `deepseek`) |
| **Hugging Face** | `HF_TOKEN` in `~/.hermes/.env` (provider: `huggingface`, aliases: `hf`) |
| **Google / Gemini** | `GOOGLE_API_KEY` (or `GEMINI_API_KEY`) in `~/.hermes/.env` (provider: `gemini`) |
| **Google Gemini (OAuth)** | `hermes model` → "Google Gemini (OAuth)" (provider: `google-gemini-cli`, free tier supported, browser PKCE login) |
| **Custom Endpoint** | `hermes model` → choose "Custom endpoint" (saved in `config.yaml`) |
:::tip Model key alias
In the `model:` config section, you can use either `default:` or `model:` as the key name for your model ID. Both `model: { default: my-model }` and `model: { model: my-model }` work identically.
:::
### Google Gemini via OAuth (`google-gemini-cli`)
The `google-gemini-cli` provider uses Google's Cloud Code Assist backend — the
same API that Google's own `gemini-cli` tool uses. This supports both the
**free tier** (generous daily quota for personal accounts) and **paid tiers**
(Standard/Enterprise via a GCP project).
**Quick start:**
```bash
hermes model
# → pick "Google Gemini (OAuth)"
# → see policy warning, confirm
# → browser opens to accounts.google.com, sign in
# → done — Hermes auto-provisions your free tier on first request
```
Hermes ships Google's **public** `gemini-cli` desktop OAuth client by default —
the same credentials Google includes in their open-source `gemini-cli`. Desktop
OAuth clients are not confidential (PKCE provides the security). You do not
need to install `gemini-cli` or register your own GCP OAuth client.
**How auth works:**
- PKCE Authorization Code flow against `accounts.google.com`
- Browser callback at `http://127.0.0.1:8085/oauth2callback` (with ephemeral-port fallback if busy)
- Tokens stored at `~/.hermes/auth/google_oauth.json` (chmod 0600, atomic write, cross-process `fcntl` lock)
- Automatic refresh 60 s before expiry
- Headless environments (SSH, `HERMES_HEADLESS=1`) → paste-mode fallback
- Inflight refresh deduplication — two concurrent requests won't double-refresh
- `invalid_grant` (revoked refresh) → credential file wiped, user prompted to re-login
**How inference works:**
- Traffic goes to `https://cloudcode-pa.googleapis.com/v1internal:generateContent`
(or `:streamGenerateContent?alt=sse` for streaming), NOT the paid `v1beta/openai` endpoint
- Request body wrapped `{project, model, user_prompt_id, request}`
- OpenAI-shaped `messages[]`, `tools[]`, `tool_choice` are translated to Gemini's native
`contents[]`, `tools[].functionDeclarations`, `toolConfig` shape
- Responses translated back to OpenAI shape so the rest of Hermes works unchanged
**Tiers & project IDs:**
| Your situation | What to do |
|---|---|
| Personal Google account, want free tier | Nothing — sign in, start chatting |
| Workspace / Standard / Enterprise account | Set `HERMES_GEMINI_PROJECT_ID` or `GOOGLE_CLOUD_PROJECT` to your GCP project ID |
| VPC-SC-protected org | Hermes detects `SECURITY_POLICY_VIOLATED` and forces `standard-tier` automatically |
Free tier auto-provisions a Google-managed project on first use. No GCP setup required.
**Quota monitoring:**
```
/gquota
```
Shows remaining Code Assist quota per model with progress bars:
```
Gemini Code Assist quota (project: 123-abc)
gemini-2.5-pro ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░░░░ 85%
gemini-2.5-flash [input] ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓░░ 92%
```
:::warning Policy risk
Google considers using the Gemini CLI OAuth client with third-party software a
policy violation. Some users have reported account restrictions. For the lowest-risk
experience, use your own API key via the `gemini` provider instead. Hermes shows
an upfront warning and requires explicit confirmation before OAuth begins.
:::
**Custom OAuth client (optional):**
If you'd rather register your own Google OAuth client — e.g., to keep quota
and consent scoped to your own GCP project — set:
```bash
HERMES_GEMINI_CLIENT_ID=your-client.apps.googleusercontent.com
HERMES_GEMINI_CLIENT_SECRET=... # optional for Desktop clients
```
Register a **Desktop app** OAuth client at
[console.cloud.google.com/apis/credentials](https://console.cloud.google.com/apis/credentials)
with the Generative Language API enabled.
:::info Codex Note
The OpenAI Codex provider authenticates via device code (open a URL, enter a code). Hermes stores the resulting credentials in its own auth store under `~/.hermes/auth.json` and can import existing Codex CLI credentials from `~/.codex/auth.json` when present. No Codex CLI installation is required.
:::

View file

@ -47,6 +47,9 @@ All variables go in `~/.hermes/.env`. You can also set them with `hermes config
| `GOOGLE_API_KEY` | Google AI Studio API key ([aistudio.google.com/app/apikey](https://aistudio.google.com/app/apikey)) |
| `GEMINI_API_KEY` | Alias for `GOOGLE_API_KEY` |
| `GEMINI_BASE_URL` | Override Google AI Studio base URL |
| `HERMES_GEMINI_CLIENT_ID` | OAuth client ID for `google-gemini-cli` PKCE login (optional; defaults to Google's public gemini-cli client) |
| `HERMES_GEMINI_CLIENT_SECRET` | OAuth client secret for `google-gemini-cli` (optional) |
| `HERMES_GEMINI_PROJECT_ID` | GCP project ID for paid Gemini tiers (free tier auto-provisions) |
| `ANTHROPIC_API_KEY` | Anthropic Console API key ([console.anthropic.com](https://console.anthropic.com/)) |
| `ANTHROPIC_TOKEN` | Manual or legacy Anthropic OAuth/setup-token override |
| `DASHSCOPE_API_KEY` | Alibaba Cloud DashScope API key for Qwen models ([modelstudio.console.alibabacloud.com](https://modelstudio.console.alibabacloud.com/)) |