mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Google-side 429 Code Assist errors now flow through Hermes' normal rate-limit
path (status_code on the exception, Retry-After preserved via error.response)
instead of being opaque RuntimeErrors. User sees a one-line capacity message
instead of a 500-char JSON dump.
Changes
- CodeAssistError grows status_code / response / retry_after / details attrs.
_extract_status_code in error_classifier picks up status_code and classifies
429 as FailoverReason.rate_limit, so fallback_providers triggers the same
way it does for SDK errors. run_agent.py line ~10428 already walks
error.response.headers for Retry-After — preserving the response means that
path just works.
- _gemini_http_error parses the Google error envelope (error.status +
error.details[].reason from google.rpc.ErrorInfo, retryDelay from
google.rpc.RetryInfo). MODEL_CAPACITY_EXHAUSTED / RESOURCE_EXHAUSTED / 404
model-not-found each produce a human-readable message; unknown shapes fall
back to the previous raw-body format.
- Drop gemma-4-26b-it from hermes_cli/models.py, hermes_cli/setup.py, and
agent/model_metadata.py — Google returned 404 for it today in local repro.
Kept gemma-4-31b-it (capacity-constrained but not retired).
Validation
| | Before | After |
|---------------------------|--------------------------------|-------------------------------------------|
| Error message | 'Code Assist returned HTTP 429: {500 chars JSON}' | 'Gemini capacity exhausted for gemini-2.5-pro (Google-side throttle...)' |
| status_code on error | None (opaque RuntimeError) | 429 |
| Classifier reason | unknown (string-match fallback) | FailoverReason.rate_limit |
| Retry-After honored | ignored | extracted from RetryInfo or header |
| gemma-4-26b-it picker | advertised (404s on Google) | removed |
Unit + E2E tests cover non-streaming 429, streaming 429, 404 model-not-found,
Retry-After header fallback, malformed body, and classifier integration.
Targeted suites: tests/agent/test_gemini_cloudcode.py (81 tests), full
tests/hermes_cli (2203 tests) green.
Co-authored-by: teknium1 <teknium@nousresearch.com>
895 lines
33 KiB
Python
895 lines
33 KiB
Python
"""OpenAI-compatible facade that talks to Google's Cloud Code Assist backend.
|
|
|
|
This adapter lets Hermes use the ``google-gemini-cli`` provider as if it were
|
|
a standard OpenAI-shaped chat completion endpoint, while the underlying HTTP
|
|
traffic goes to ``cloudcode-pa.googleapis.com/v1internal:{generateContent,
|
|
streamGenerateContent}`` with a Bearer access token obtained via OAuth PKCE.
|
|
|
|
Architecture
|
|
------------
|
|
- ``GeminiCloudCodeClient`` exposes ``.chat.completions.create(**kwargs)``
|
|
mirroring the subset of the OpenAI SDK that ``run_agent.py`` uses.
|
|
- Incoming OpenAI ``messages[]`` / ``tools[]`` / ``tool_choice`` are translated
|
|
to Gemini's native ``contents[]`` / ``tools[].functionDeclarations`` /
|
|
``toolConfig`` / ``systemInstruction`` shape.
|
|
- The request body is wrapped ``{project, model, user_prompt_id, request}``
|
|
per Code Assist API expectations.
|
|
- Responses (``candidates[].content.parts[]``) are converted back to
|
|
OpenAI ``choices[0].message`` shape with ``content`` + ``tool_calls``.
|
|
- Streaming uses SSE (``?alt=sse``) and yields OpenAI-shaped delta chunks.
|
|
|
|
Attribution
|
|
-----------
|
|
Translation semantics follow jenslys/opencode-gemini-auth (MIT) and the public
|
|
Gemini API docs. Request envelope shape
|
|
(``{project, model, user_prompt_id, request}``) is documented nowhere; it is
|
|
reverse-engineered from the opencode-gemini-auth and clawdbot implementations.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
import uuid
|
|
from types import SimpleNamespace
|
|
from typing import Any, Dict, Iterator, List, Optional
|
|
|
|
import httpx
|
|
|
|
from agent import google_oauth
|
|
from agent.google_code_assist import (
|
|
CODE_ASSIST_ENDPOINT,
|
|
FREE_TIER_ID,
|
|
CodeAssistError,
|
|
ProjectContext,
|
|
resolve_project_context,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Request translation: OpenAI → Gemini
|
|
# =============================================================================
|
|
|
|
_ROLE_MAP_OPENAI_TO_GEMINI = {
|
|
"user": "user",
|
|
"assistant": "model",
|
|
"system": "user", # handled separately via systemInstruction
|
|
"tool": "user", # functionResponse is wrapped in a user-role turn
|
|
"function": "user",
|
|
}
|
|
|
|
|
|
def _coerce_content_to_text(content: Any) -> str:
|
|
"""OpenAI content may be str or a list of parts; reduce to plain text."""
|
|
if content is None:
|
|
return ""
|
|
if isinstance(content, str):
|
|
return content
|
|
if isinstance(content, list):
|
|
pieces: List[str] = []
|
|
for p in content:
|
|
if isinstance(p, str):
|
|
pieces.append(p)
|
|
elif isinstance(p, dict):
|
|
if p.get("type") == "text" and isinstance(p.get("text"), str):
|
|
pieces.append(p["text"])
|
|
# Multimodal (image_url, etc.) — stub for now; log and skip
|
|
elif p.get("type") in ("image_url", "input_audio"):
|
|
logger.debug("Dropping multimodal part (not yet supported): %s", p.get("type"))
|
|
return "\n".join(pieces)
|
|
return str(content)
|
|
|
|
|
|
def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""OpenAI tool_call -> Gemini functionCall part."""
|
|
fn = tool_call.get("function") or {}
|
|
args_raw = fn.get("arguments", "")
|
|
try:
|
|
args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
|
|
except json.JSONDecodeError:
|
|
args = {"_raw": args_raw}
|
|
if not isinstance(args, dict):
|
|
args = {"_value": args}
|
|
return {
|
|
"functionCall": {
|
|
"name": fn.get("name") or "",
|
|
"args": args,
|
|
},
|
|
# Sentinel signature — matches opencode-gemini-auth's approach.
|
|
# Without this, Code Assist rejects function calls that originated
|
|
# outside its own chain.
|
|
"thoughtSignature": "skip_thought_signature_validator",
|
|
}
|
|
|
|
|
|
def _translate_tool_result_to_gemini(message: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""OpenAI tool-role message -> Gemini functionResponse part.
|
|
|
|
The function name isn't in the OpenAI tool message directly; it must be
|
|
passed via the assistant message that issued the call. For simplicity we
|
|
look up ``name`` on the message (OpenAI SDK copies it there) or on the
|
|
``tool_call_id`` cross-reference.
|
|
"""
|
|
name = str(message.get("name") or message.get("tool_call_id") or "tool")
|
|
content = _coerce_content_to_text(message.get("content"))
|
|
# Gemini expects the response as a dict under `response`. We wrap plain
|
|
# text in {"output": "..."}.
|
|
try:
|
|
parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None
|
|
except json.JSONDecodeError:
|
|
parsed = None
|
|
response = parsed if isinstance(parsed, dict) else {"output": content}
|
|
return {
|
|
"functionResponse": {
|
|
"name": name,
|
|
"response": response,
|
|
},
|
|
}
|
|
|
|
|
|
def _build_gemini_contents(
|
|
messages: List[Dict[str, Any]],
|
|
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
|
|
"""Convert OpenAI messages[] to Gemini contents[] + systemInstruction."""
|
|
system_text_parts: List[str] = []
|
|
contents: List[Dict[str, Any]] = []
|
|
|
|
for msg in messages:
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
role = str(msg.get("role") or "user")
|
|
|
|
if role == "system":
|
|
system_text_parts.append(_coerce_content_to_text(msg.get("content")))
|
|
continue
|
|
|
|
# Tool result message — emit a user-role turn with functionResponse
|
|
if role == "tool" or role == "function":
|
|
contents.append({
|
|
"role": "user",
|
|
"parts": [_translate_tool_result_to_gemini(msg)],
|
|
})
|
|
continue
|
|
|
|
gemini_role = _ROLE_MAP_OPENAI_TO_GEMINI.get(role, "user")
|
|
parts: List[Dict[str, Any]] = []
|
|
|
|
text = _coerce_content_to_text(msg.get("content"))
|
|
if text:
|
|
parts.append({"text": text})
|
|
|
|
# Assistant messages can carry tool_calls
|
|
tool_calls = msg.get("tool_calls") or []
|
|
if isinstance(tool_calls, list):
|
|
for tc in tool_calls:
|
|
if isinstance(tc, dict):
|
|
parts.append(_translate_tool_call_to_gemini(tc))
|
|
|
|
if not parts:
|
|
# Gemini rejects empty parts; skip the turn entirely
|
|
continue
|
|
|
|
contents.append({"role": gemini_role, "parts": parts})
|
|
|
|
system_instruction: Optional[Dict[str, Any]] = None
|
|
joined_system = "\n".join(p for p in system_text_parts if p).strip()
|
|
if joined_system:
|
|
system_instruction = {
|
|
"role": "system",
|
|
"parts": [{"text": joined_system}],
|
|
}
|
|
|
|
return contents, system_instruction
|
|
|
|
|
|
def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:
|
|
"""OpenAI tools[] -> Gemini tools[].functionDeclarations[]."""
|
|
if not isinstance(tools, list) or not tools:
|
|
return []
|
|
declarations: List[Dict[str, Any]] = []
|
|
for t in tools:
|
|
if not isinstance(t, dict):
|
|
continue
|
|
fn = t.get("function") or {}
|
|
if not isinstance(fn, dict):
|
|
continue
|
|
name = fn.get("name")
|
|
if not name:
|
|
continue
|
|
decl = {"name": str(name)}
|
|
if fn.get("description"):
|
|
decl["description"] = str(fn["description"])
|
|
params = fn.get("parameters")
|
|
if isinstance(params, dict):
|
|
decl["parameters"] = params
|
|
declarations.append(decl)
|
|
if not declarations:
|
|
return []
|
|
return [{"functionDeclarations": declarations}]
|
|
|
|
|
|
def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]:
|
|
"""OpenAI tool_choice -> Gemini toolConfig.functionCallingConfig."""
|
|
if tool_choice is None:
|
|
return None
|
|
if isinstance(tool_choice, str):
|
|
if tool_choice == "auto":
|
|
return {"functionCallingConfig": {"mode": "AUTO"}}
|
|
if tool_choice == "required":
|
|
return {"functionCallingConfig": {"mode": "ANY"}}
|
|
if tool_choice == "none":
|
|
return {"functionCallingConfig": {"mode": "NONE"}}
|
|
if isinstance(tool_choice, dict):
|
|
fn = tool_choice.get("function") or {}
|
|
name = fn.get("name")
|
|
if name:
|
|
return {
|
|
"functionCallingConfig": {
|
|
"mode": "ANY",
|
|
"allowedFunctionNames": [str(name)],
|
|
},
|
|
}
|
|
return None
|
|
|
|
|
|
def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]:
|
|
"""Accept thinkingBudget / thinkingLevel / includeThoughts (+ snake_case)."""
|
|
if not isinstance(config, dict) or not config:
|
|
return None
|
|
budget = config.get("thinkingBudget", config.get("thinking_budget"))
|
|
level = config.get("thinkingLevel", config.get("thinking_level"))
|
|
include = config.get("includeThoughts", config.get("include_thoughts"))
|
|
normalized: Dict[str, Any] = {}
|
|
if isinstance(budget, (int, float)):
|
|
normalized["thinkingBudget"] = int(budget)
|
|
if isinstance(level, str) and level.strip():
|
|
normalized["thinkingLevel"] = level.strip().lower()
|
|
if isinstance(include, bool):
|
|
normalized["includeThoughts"] = include
|
|
return normalized or None
|
|
|
|
|
|
def build_gemini_request(
|
|
*,
|
|
messages: List[Dict[str, Any]],
|
|
tools: Any = None,
|
|
tool_choice: Any = None,
|
|
temperature: Optional[float] = None,
|
|
max_tokens: Optional[int] = None,
|
|
top_p: Optional[float] = None,
|
|
stop: Any = None,
|
|
thinking_config: Any = None,
|
|
) -> Dict[str, Any]:
|
|
"""Build the inner Gemini request body (goes inside ``request`` wrapper)."""
|
|
contents, system_instruction = _build_gemini_contents(messages)
|
|
|
|
body: Dict[str, Any] = {"contents": contents}
|
|
if system_instruction is not None:
|
|
body["systemInstruction"] = system_instruction
|
|
|
|
gemini_tools = _translate_tools_to_gemini(tools)
|
|
if gemini_tools:
|
|
body["tools"] = gemini_tools
|
|
tool_cfg = _translate_tool_choice_to_gemini(tool_choice)
|
|
if tool_cfg is not None:
|
|
body["toolConfig"] = tool_cfg
|
|
|
|
generation_config: Dict[str, Any] = {}
|
|
if isinstance(temperature, (int, float)):
|
|
generation_config["temperature"] = float(temperature)
|
|
if isinstance(max_tokens, int) and max_tokens > 0:
|
|
generation_config["maxOutputTokens"] = max_tokens
|
|
if isinstance(top_p, (int, float)):
|
|
generation_config["topP"] = float(top_p)
|
|
if isinstance(stop, str) and stop:
|
|
generation_config["stopSequences"] = [stop]
|
|
elif isinstance(stop, list) and stop:
|
|
generation_config["stopSequences"] = [str(s) for s in stop if s]
|
|
normalized_thinking = _normalize_thinking_config(thinking_config)
|
|
if normalized_thinking:
|
|
generation_config["thinkingConfig"] = normalized_thinking
|
|
if generation_config:
|
|
body["generationConfig"] = generation_config
|
|
|
|
return body
|
|
|
|
|
|
def wrap_code_assist_request(
|
|
*,
|
|
project_id: str,
|
|
model: str,
|
|
inner_request: Dict[str, Any],
|
|
user_prompt_id: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Wrap the inner Gemini request in the Code Assist envelope."""
|
|
return {
|
|
"project": project_id,
|
|
"model": model,
|
|
"user_prompt_id": user_prompt_id or str(uuid.uuid4()),
|
|
"request": inner_request,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Response translation: Gemini → OpenAI
|
|
# =============================================================================
|
|
|
|
def _translate_gemini_response(
|
|
resp: Dict[str, Any],
|
|
model: str,
|
|
) -> SimpleNamespace:
|
|
"""Non-streaming Gemini response -> OpenAI-shaped SimpleNamespace.
|
|
|
|
Code Assist wraps the actual Gemini response inside ``response``, so we
|
|
unwrap it first if present.
|
|
"""
|
|
inner = resp.get("response") if isinstance(resp.get("response"), dict) else resp
|
|
|
|
candidates = inner.get("candidates") or []
|
|
if not isinstance(candidates, list) or not candidates:
|
|
return _empty_response(model)
|
|
|
|
cand = candidates[0]
|
|
content_obj = cand.get("content") if isinstance(cand, dict) else {}
|
|
parts = content_obj.get("parts") if isinstance(content_obj, dict) else []
|
|
|
|
text_pieces: List[str] = []
|
|
reasoning_pieces: List[str] = []
|
|
tool_calls: List[SimpleNamespace] = []
|
|
|
|
for i, part in enumerate(parts or []):
|
|
if not isinstance(part, dict):
|
|
continue
|
|
# Thought parts are model's internal reasoning — surface as reasoning,
|
|
# don't mix into content.
|
|
if part.get("thought") is True:
|
|
if isinstance(part.get("text"), str):
|
|
reasoning_pieces.append(part["text"])
|
|
continue
|
|
if isinstance(part.get("text"), str):
|
|
text_pieces.append(part["text"])
|
|
continue
|
|
fc = part.get("functionCall")
|
|
if isinstance(fc, dict) and fc.get("name"):
|
|
try:
|
|
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
|
|
except (TypeError, ValueError):
|
|
args_str = "{}"
|
|
tool_calls.append(SimpleNamespace(
|
|
id=f"call_{uuid.uuid4().hex[:12]}",
|
|
type="function",
|
|
index=i,
|
|
function=SimpleNamespace(name=str(fc["name"]), arguments=args_str),
|
|
))
|
|
|
|
finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason(
|
|
str(cand.get("finishReason") or "")
|
|
)
|
|
|
|
usage_meta = inner.get("usageMetadata") or {}
|
|
usage = SimpleNamespace(
|
|
prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
|
|
completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
|
|
total_tokens=int(usage_meta.get("totalTokenCount") or 0),
|
|
prompt_tokens_details=SimpleNamespace(
|
|
cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
|
|
),
|
|
)
|
|
|
|
message = SimpleNamespace(
|
|
role="assistant",
|
|
content="".join(text_pieces) if text_pieces else None,
|
|
tool_calls=tool_calls or None,
|
|
reasoning="".join(reasoning_pieces) or None,
|
|
reasoning_content="".join(reasoning_pieces) or None,
|
|
reasoning_details=None,
|
|
)
|
|
choice = SimpleNamespace(
|
|
index=0,
|
|
message=message,
|
|
finish_reason=finish_reason,
|
|
)
|
|
return SimpleNamespace(
|
|
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
|
|
object="chat.completion",
|
|
created=int(time.time()),
|
|
model=model,
|
|
choices=[choice],
|
|
usage=usage,
|
|
)
|
|
|
|
|
|
def _empty_response(model: str) -> SimpleNamespace:
|
|
message = SimpleNamespace(
|
|
role="assistant", content="", tool_calls=None,
|
|
reasoning=None, reasoning_content=None, reasoning_details=None,
|
|
)
|
|
choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
|
|
usage = SimpleNamespace(
|
|
prompt_tokens=0, completion_tokens=0, total_tokens=0,
|
|
prompt_tokens_details=SimpleNamespace(cached_tokens=0),
|
|
)
|
|
return SimpleNamespace(
|
|
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
|
|
object="chat.completion",
|
|
created=int(time.time()),
|
|
model=model,
|
|
choices=[choice],
|
|
usage=usage,
|
|
)
|
|
|
|
|
|
def _map_gemini_finish_reason(reason: str) -> str:
|
|
mapping = {
|
|
"STOP": "stop",
|
|
"MAX_TOKENS": "length",
|
|
"SAFETY": "content_filter",
|
|
"RECITATION": "content_filter",
|
|
"OTHER": "stop",
|
|
}
|
|
return mapping.get(reason.upper(), "stop")
|
|
|
|
|
|
# =============================================================================
|
|
# Streaming SSE iterator
|
|
# =============================================================================
|
|
|
|
class _GeminiStreamChunk(SimpleNamespace):
|
|
"""Mimics an OpenAI ChatCompletionChunk with .choices[0].delta."""
|
|
pass
|
|
|
|
|
|
def _make_stream_chunk(
|
|
*,
|
|
model: str,
|
|
content: str = "",
|
|
tool_call_delta: Optional[Dict[str, Any]] = None,
|
|
finish_reason: Optional[str] = None,
|
|
reasoning: str = "",
|
|
) -> _GeminiStreamChunk:
|
|
delta_kwargs: Dict[str, Any] = {"role": "assistant"}
|
|
if content:
|
|
delta_kwargs["content"] = content
|
|
if tool_call_delta is not None:
|
|
delta_kwargs["tool_calls"] = [SimpleNamespace(
|
|
index=tool_call_delta.get("index", 0),
|
|
id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}",
|
|
type="function",
|
|
function=SimpleNamespace(
|
|
name=tool_call_delta.get("name") or "",
|
|
arguments=tool_call_delta.get("arguments") or "",
|
|
),
|
|
)]
|
|
if reasoning:
|
|
delta_kwargs["reasoning"] = reasoning
|
|
delta_kwargs["reasoning_content"] = reasoning
|
|
delta = SimpleNamespace(**delta_kwargs)
|
|
choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason)
|
|
return _GeminiStreamChunk(
|
|
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
|
|
object="chat.completion.chunk",
|
|
created=int(time.time()),
|
|
model=model,
|
|
choices=[choice],
|
|
usage=None,
|
|
)
|
|
|
|
|
|
def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]:
|
|
"""Parse Server-Sent Events from an httpx streaming response."""
|
|
buffer = ""
|
|
for chunk in response.iter_text():
|
|
if not chunk:
|
|
continue
|
|
buffer += chunk
|
|
while "\n" in buffer:
|
|
line, buffer = buffer.split("\n", 1)
|
|
line = line.rstrip("\r")
|
|
if not line:
|
|
continue
|
|
if line.startswith("data: "):
|
|
data = line[6:]
|
|
if data == "[DONE]":
|
|
return
|
|
try:
|
|
yield json.loads(data)
|
|
except json.JSONDecodeError:
|
|
logger.debug("Non-JSON SSE line: %s", data[:200])
|
|
|
|
|
|
def _translate_stream_event(
|
|
event: Dict[str, Any],
|
|
model: str,
|
|
tool_call_indices: Dict[str, int],
|
|
) -> List[_GeminiStreamChunk]:
|
|
"""Unwrap Code Assist envelope and emit OpenAI-shaped chunk(s)."""
|
|
inner = event.get("response") if isinstance(event.get("response"), dict) else event
|
|
candidates = inner.get("candidates") or []
|
|
if not candidates:
|
|
return []
|
|
cand = candidates[0]
|
|
if not isinstance(cand, dict):
|
|
return []
|
|
|
|
chunks: List[_GeminiStreamChunk] = []
|
|
|
|
content = cand.get("content") or {}
|
|
parts = content.get("parts") if isinstance(content, dict) else []
|
|
for part in parts or []:
|
|
if not isinstance(part, dict):
|
|
continue
|
|
if part.get("thought") is True and isinstance(part.get("text"), str):
|
|
chunks.append(_make_stream_chunk(
|
|
model=model, reasoning=part["text"],
|
|
))
|
|
continue
|
|
if isinstance(part.get("text"), str) and part["text"]:
|
|
chunks.append(_make_stream_chunk(model=model, content=part["text"]))
|
|
fc = part.get("functionCall")
|
|
if isinstance(fc, dict) and fc.get("name"):
|
|
name = str(fc["name"])
|
|
idx = tool_call_indices.setdefault(name, len(tool_call_indices))
|
|
try:
|
|
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
|
|
except (TypeError, ValueError):
|
|
args_str = "{}"
|
|
chunks.append(_make_stream_chunk(
|
|
model=model,
|
|
tool_call_delta={
|
|
"index": idx,
|
|
"name": name,
|
|
"arguments": args_str,
|
|
},
|
|
))
|
|
|
|
finish_reason_raw = str(cand.get("finishReason") or "")
|
|
if finish_reason_raw:
|
|
mapped = _map_gemini_finish_reason(finish_reason_raw)
|
|
if tool_call_indices:
|
|
mapped = "tool_calls"
|
|
chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
|
|
return chunks
|
|
|
|
|
|
# =============================================================================
|
|
# GeminiCloudCodeClient — OpenAI-compatible facade
|
|
# =============================================================================
|
|
|
|
MARKER_BASE_URL = "cloudcode-pa://google"
|
|
|
|
|
|
class _GeminiChatCompletions:
|
|
def __init__(self, client: "GeminiCloudCodeClient"):
|
|
self._client = client
|
|
|
|
def create(self, **kwargs: Any) -> Any:
|
|
return self._client._create_chat_completion(**kwargs)
|
|
|
|
|
|
class _GeminiChatNamespace:
|
|
def __init__(self, client: "GeminiCloudCodeClient"):
|
|
self.completions = _GeminiChatCompletions(client)
|
|
|
|
|
|
class GeminiCloudCodeClient:
|
|
"""Minimal OpenAI-SDK-compatible facade over Code Assist v1internal."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
api_key: Optional[str] = None,
|
|
base_url: Optional[str] = None,
|
|
default_headers: Optional[Dict[str, str]] = None,
|
|
project_id: str = "",
|
|
**_: Any,
|
|
):
|
|
# `api_key` here is a dummy — real auth is the OAuth access token
|
|
# fetched on every call via agent.google_oauth.get_valid_access_token().
|
|
# We accept the kwarg for openai.OpenAI interface parity.
|
|
self.api_key = api_key or "google-oauth"
|
|
self.base_url = base_url or MARKER_BASE_URL
|
|
self._default_headers = dict(default_headers or {})
|
|
self._configured_project_id = project_id
|
|
self._project_context: Optional[ProjectContext] = None
|
|
self._project_context_lock = False # simple single-thread guard
|
|
self.chat = _GeminiChatNamespace(self)
|
|
self.is_closed = False
|
|
self._http = httpx.Client(timeout=httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0))
|
|
|
|
def close(self) -> None:
|
|
self.is_closed = True
|
|
try:
|
|
self._http.close()
|
|
except Exception:
|
|
pass
|
|
|
|
# Implement the OpenAI SDK's context-manager-ish closure check
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
def _ensure_project_context(self, access_token: str, model: str) -> ProjectContext:
|
|
"""Lazily resolve and cache the project context for this client."""
|
|
if self._project_context is not None:
|
|
return self._project_context
|
|
|
|
env_project = google_oauth.resolve_project_id_from_env()
|
|
creds = google_oauth.load_credentials()
|
|
stored_project = creds.project_id if creds else ""
|
|
|
|
# Prefer what's already baked into the creds
|
|
if stored_project:
|
|
self._project_context = ProjectContext(
|
|
project_id=stored_project,
|
|
managed_project_id=creds.managed_project_id if creds else "",
|
|
tier_id="",
|
|
source="stored",
|
|
)
|
|
return self._project_context
|
|
|
|
ctx = resolve_project_context(
|
|
access_token,
|
|
configured_project_id=self._configured_project_id,
|
|
env_project_id=env_project,
|
|
user_agent_model=model,
|
|
)
|
|
# Persist discovered project back to the creds file so the next
|
|
# session doesn't re-run the discovery.
|
|
if ctx.project_id or ctx.managed_project_id:
|
|
google_oauth.update_project_ids(
|
|
project_id=ctx.project_id,
|
|
managed_project_id=ctx.managed_project_id,
|
|
)
|
|
self._project_context = ctx
|
|
return ctx
|
|
|
|
def _create_chat_completion(
|
|
self,
|
|
*,
|
|
model: str = "gemini-2.5-flash",
|
|
messages: Optional[List[Dict[str, Any]]] = None,
|
|
stream: bool = False,
|
|
tools: Any = None,
|
|
tool_choice: Any = None,
|
|
temperature: Optional[float] = None,
|
|
max_tokens: Optional[int] = None,
|
|
top_p: Optional[float] = None,
|
|
stop: Any = None,
|
|
extra_body: Optional[Dict[str, Any]] = None,
|
|
timeout: Any = None,
|
|
**_: Any,
|
|
) -> Any:
|
|
access_token = google_oauth.get_valid_access_token()
|
|
ctx = self._ensure_project_context(access_token, model)
|
|
|
|
thinking_config = None
|
|
if isinstance(extra_body, dict):
|
|
thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig")
|
|
|
|
inner = build_gemini_request(
|
|
messages=messages or [],
|
|
tools=tools,
|
|
tool_choice=tool_choice,
|
|
temperature=temperature,
|
|
max_tokens=max_tokens,
|
|
top_p=top_p,
|
|
stop=stop,
|
|
thinking_config=thinking_config,
|
|
)
|
|
wrapped = wrap_code_assist_request(
|
|
project_id=ctx.project_id,
|
|
model=model,
|
|
inner_request=inner,
|
|
)
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"Authorization": f"Bearer {access_token}",
|
|
"User-Agent": "hermes-agent (gemini-cli-compat)",
|
|
"X-Goog-Api-Client": "gl-python/hermes",
|
|
"x-activity-request-id": str(uuid.uuid4()),
|
|
}
|
|
headers.update(self._default_headers)
|
|
|
|
if stream:
|
|
return self._stream_completion(model=model, wrapped=wrapped, headers=headers)
|
|
|
|
url = f"{CODE_ASSIST_ENDPOINT}/v1internal:generateContent"
|
|
response = self._http.post(url, json=wrapped, headers=headers)
|
|
if response.status_code != 200:
|
|
raise _gemini_http_error(response)
|
|
try:
|
|
payload = response.json()
|
|
except ValueError as exc:
|
|
raise CodeAssistError(
|
|
f"Invalid JSON from Code Assist: {exc}",
|
|
code="code_assist_invalid_json",
|
|
) from exc
|
|
return _translate_gemini_response(payload, model=model)
|
|
|
|
def _stream_completion(
|
|
self,
|
|
*,
|
|
model: str,
|
|
wrapped: Dict[str, Any],
|
|
headers: Dict[str, str],
|
|
) -> Iterator[_GeminiStreamChunk]:
|
|
"""Generator that yields OpenAI-shaped streaming chunks."""
|
|
url = f"{CODE_ASSIST_ENDPOINT}/v1internal:streamGenerateContent?alt=sse"
|
|
stream_headers = dict(headers)
|
|
stream_headers["Accept"] = "text/event-stream"
|
|
|
|
def _generator() -> Iterator[_GeminiStreamChunk]:
|
|
try:
|
|
with self._http.stream("POST", url, json=wrapped, headers=stream_headers) as response:
|
|
if response.status_code != 200:
|
|
# Materialize error body for better diagnostics
|
|
response.read()
|
|
raise _gemini_http_error(response)
|
|
tool_call_indices: Dict[str, int] = {}
|
|
for event in _iter_sse_events(response):
|
|
for chunk in _translate_stream_event(event, model, tool_call_indices):
|
|
yield chunk
|
|
except httpx.HTTPError as exc:
|
|
raise CodeAssistError(
|
|
f"Streaming request failed: {exc}",
|
|
code="code_assist_stream_error",
|
|
) from exc
|
|
|
|
return _generator()
|
|
|
|
|
|
def _gemini_http_error(response: httpx.Response) -> CodeAssistError:
|
|
"""Translate an httpx response into a CodeAssistError with rich metadata.
|
|
|
|
Parses Google's error envelope (``{"error": {"code", "message", "status",
|
|
"details": [...]}}``) so the agent's error classifier can reason about
|
|
the failure — ``status_code`` enables the rate_limit / auth classification
|
|
paths, and ``response`` lets the main loop honor ``Retry-After`` just
|
|
like it does for OpenAI SDK exceptions.
|
|
|
|
Also lifts a few recognizable Google conditions into human-readable
|
|
messages so the user sees something better than a 500-char JSON dump:
|
|
|
|
MODEL_CAPACITY_EXHAUSTED → "Gemini model capacity exhausted for
|
|
<model>. This is a Google-side throttle..."
|
|
RESOURCE_EXHAUSTED w/o reason → quota-style message
|
|
404 → "Model <name> not found at cloudcode-pa..."
|
|
"""
|
|
status = response.status_code
|
|
|
|
# Parse the body once, surviving any weird encodings.
|
|
body_text = ""
|
|
body_json: Dict[str, Any] = {}
|
|
try:
|
|
body_text = response.text
|
|
except Exception:
|
|
body_text = ""
|
|
if body_text:
|
|
try:
|
|
parsed = json.loads(body_text)
|
|
if isinstance(parsed, dict):
|
|
body_json = parsed
|
|
except (ValueError, TypeError):
|
|
body_json = {}
|
|
|
|
# Dig into Google's error envelope. Shape is:
|
|
# {"error": {"code": 429, "message": "...", "status": "RESOURCE_EXHAUSTED",
|
|
# "details": [{"@type": ".../ErrorInfo", "reason": "MODEL_CAPACITY_EXHAUSTED",
|
|
# "metadata": {...}},
|
|
# {"@type": ".../RetryInfo", "retryDelay": "30s"}]}}
|
|
err_obj = body_json.get("error") if isinstance(body_json, dict) else None
|
|
if not isinstance(err_obj, dict):
|
|
err_obj = {}
|
|
err_status = str(err_obj.get("status") or "").strip()
|
|
err_message = str(err_obj.get("message") or "").strip()
|
|
err_details_list = err_obj.get("details") if isinstance(err_obj.get("details"), list) else []
|
|
|
|
# Extract google.rpc.ErrorInfo reason + metadata. There may be more
|
|
# than one ErrorInfo (rare), so we pick the first one with a reason.
|
|
error_reason = ""
|
|
error_metadata: Dict[str, Any] = {}
|
|
retry_delay_seconds: Optional[float] = None
|
|
for detail in err_details_list:
|
|
if not isinstance(detail, dict):
|
|
continue
|
|
type_url = str(detail.get("@type") or "")
|
|
if not error_reason and type_url.endswith("/google.rpc.ErrorInfo"):
|
|
reason = detail.get("reason")
|
|
if isinstance(reason, str) and reason:
|
|
error_reason = reason
|
|
md = detail.get("metadata")
|
|
if isinstance(md, dict):
|
|
error_metadata = md
|
|
elif retry_delay_seconds is None and type_url.endswith("/google.rpc.RetryInfo"):
|
|
# retryDelay is a google.protobuf.Duration string like "30s" or "1.5s".
|
|
delay_raw = detail.get("retryDelay")
|
|
if isinstance(delay_raw, str) and delay_raw.endswith("s"):
|
|
try:
|
|
retry_delay_seconds = float(delay_raw[:-1])
|
|
except ValueError:
|
|
pass
|
|
elif isinstance(delay_raw, (int, float)):
|
|
retry_delay_seconds = float(delay_raw)
|
|
|
|
# Fall back to the Retry-After header if the body didn't include RetryInfo.
|
|
if retry_delay_seconds is None:
|
|
try:
|
|
header_val = response.headers.get("Retry-After") or response.headers.get("retry-after")
|
|
except Exception:
|
|
header_val = None
|
|
if header_val:
|
|
try:
|
|
retry_delay_seconds = float(header_val)
|
|
except (TypeError, ValueError):
|
|
retry_delay_seconds = None
|
|
|
|
# Classify the error code. ``code_assist_rate_limited`` stays the default
|
|
# for 429s; a more specific reason tag helps downstream callers (e.g. tests,
|
|
# logs) without changing the rate_limit classification path.
|
|
code = f"code_assist_http_{status}"
|
|
if status == 401:
|
|
code = "code_assist_unauthorized"
|
|
elif status == 429:
|
|
code = "code_assist_rate_limited"
|
|
if error_reason == "MODEL_CAPACITY_EXHAUSTED":
|
|
code = "code_assist_capacity_exhausted"
|
|
|
|
# Build a human-readable message. Keep the status + a raw-body tail for
|
|
# debugging, but lead with a friendlier summary when we recognize the
|
|
# Google signal.
|
|
model_hint = ""
|
|
if isinstance(error_metadata, dict):
|
|
model_hint = str(error_metadata.get("model") or error_metadata.get("modelId") or "").strip()
|
|
|
|
if status == 429 and error_reason == "MODEL_CAPACITY_EXHAUSTED":
|
|
target = model_hint or "this Gemini model"
|
|
message = (
|
|
f"Gemini capacity exhausted for {target} (Google-side throttle, "
|
|
f"not a Hermes issue). Try a different Gemini model or set a "
|
|
f"fallback_providers entry to a non-Gemini provider."
|
|
)
|
|
if retry_delay_seconds is not None:
|
|
message += f" Google suggests retrying in {retry_delay_seconds:g}s."
|
|
elif status == 429 and err_status == "RESOURCE_EXHAUSTED":
|
|
message = (
|
|
f"Gemini quota exhausted ({err_message or 'RESOURCE_EXHAUSTED'}). "
|
|
f"Check /gquota for remaining daily requests."
|
|
)
|
|
if retry_delay_seconds is not None:
|
|
message += f" Retry suggested in {retry_delay_seconds:g}s."
|
|
elif status == 404:
|
|
# Google returns 404 when a model has been retired or renamed.
|
|
target = model_hint or (err_message or "model")
|
|
message = (
|
|
f"Code Assist 404: {target} is not available at "
|
|
f"cloudcode-pa.googleapis.com. It may have been renamed or "
|
|
f"retired. Check hermes_cli/models.py for the current list."
|
|
)
|
|
elif err_message:
|
|
# Generic fallback with the parsed message.
|
|
message = f"Code Assist HTTP {status} ({err_status or 'error'}): {err_message}"
|
|
else:
|
|
# Last-ditch fallback — raw body snippet.
|
|
message = f"Code Assist returned HTTP {status}: {body_text[:500]}"
|
|
|
|
return CodeAssistError(
|
|
message,
|
|
code=code,
|
|
status_code=status,
|
|
response=response,
|
|
retry_after=retry_delay_seconds,
|
|
details={
|
|
"status": err_status,
|
|
"reason": error_reason,
|
|
"metadata": error_metadata,
|
|
"message": err_message,
|
|
},
|
|
)
|