"""OpenAI-compatible facade that talks to Google's Cloud Code Assist backend. This adapter lets Hermes use the ``google-gemini-cli`` provider as if it were a standard OpenAI-shaped chat completion endpoint, while the underlying HTTP traffic goes to ``cloudcode-pa.googleapis.com/v1internal:{generateContent, streamGenerateContent}`` with a Bearer access token obtained via OAuth PKCE. Architecture ------------ - ``GeminiCloudCodeClient`` exposes ``.chat.completions.create(**kwargs)`` mirroring the subset of the OpenAI SDK that ``run_agent.py`` uses. - Incoming OpenAI ``messages[]`` / ``tools[]`` / ``tool_choice`` are translated to Gemini's native ``contents[]`` / ``tools[].functionDeclarations`` / ``toolConfig`` / ``systemInstruction`` shape. - The request body is wrapped ``{project, model, user_prompt_id, request}`` per Code Assist API expectations. - Responses (``candidates[].content.parts[]``) are converted back to OpenAI ``choices[0].message`` shape with ``content`` + ``tool_calls``. - Streaming uses SSE (``?alt=sse``) and yields OpenAI-shaped delta chunks. Attribution ----------- Translation semantics follow jenslys/opencode-gemini-auth (MIT) and the public Gemini API docs. Request envelope shape (``{project, model, user_prompt_id, request}``) is documented nowhere; it is reverse-engineered from the opencode-gemini-auth and clawdbot implementations. """ from __future__ import annotations import json import logging import time import uuid from types import SimpleNamespace from typing import Any, Dict, Iterator, List, Optional import httpx from agent import google_oauth from agent.gemini_schema import sanitize_gemini_tool_parameters from agent.google_code_assist import ( CODE_ASSIST_ENDPOINT, CodeAssistError, ProjectContext, resolve_project_context, ) logger = logging.getLogger(__name__) # ============================================================================= # Request translation: OpenAI → Gemini # ============================================================================= _ROLE_MAP_OPENAI_TO_GEMINI = { "user": "user", "assistant": "model", "system": "user", # handled separately via systemInstruction "tool": "user", # functionResponse is wrapped in a user-role turn "function": "user", } def _coerce_content_to_text(content: Any) -> str: """OpenAI content may be str or a list of parts; reduce to plain text.""" if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): pieces: List[str] = [] for p in content: if isinstance(p, str): pieces.append(p) elif isinstance(p, dict): if p.get("type") == "text" and isinstance(p.get("text"), str): pieces.append(p["text"]) # Multimodal (image_url, etc.) — stub for now; log and skip elif p.get("type") in ("image_url", "input_audio"): logger.debug("Dropping multimodal part (not yet supported): %s", p.get("type")) return "\n".join(pieces) return str(content) def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]: """OpenAI tool_call -> Gemini functionCall part.""" fn = tool_call.get("function") or {} args_raw = fn.get("arguments", "") try: args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {} except json.JSONDecodeError: args = {"_raw": args_raw} if not isinstance(args, dict): args = {"_value": args} return { "functionCall": { "name": fn.get("name") or "", "args": args, }, # Sentinel signature — matches opencode-gemini-auth's approach. # Without this, Code Assist rejects function calls that originated # outside its own chain. "thoughtSignature": "skip_thought_signature_validator", } def _translate_tool_result_to_gemini(message: Dict[str, Any]) -> Dict[str, Any]: """OpenAI tool-role message -> Gemini functionResponse part. The function name isn't in the OpenAI tool message directly; it must be passed via the assistant message that issued the call. For simplicity we look up ``name`` on the message (OpenAI SDK copies it there) or on the ``tool_call_id`` cross-reference. """ name = str(message.get("name") or message.get("tool_call_id") or "tool") content = _coerce_content_to_text(message.get("content")) # Gemini expects the response as a dict under `response`. We wrap plain # text in {"output": "..."}. try: parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None except json.JSONDecodeError: parsed = None response = parsed if isinstance(parsed, dict) else {"output": content} return { "functionResponse": { "name": name, "response": response, }, } def _build_gemini_contents( messages: List[Dict[str, Any]], ) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]: """Convert OpenAI messages[] to Gemini contents[] + systemInstruction.""" system_text_parts: List[str] = [] contents: List[Dict[str, Any]] = [] for msg in messages: if not isinstance(msg, dict): continue role = str(msg.get("role") or "user") if role == "system": system_text_parts.append(_coerce_content_to_text(msg.get("content"))) continue # Tool result message — emit a user-role turn with functionResponse if role == "tool" or role == "function": contents.append({ "role": "user", "parts": [_translate_tool_result_to_gemini(msg)], }) continue gemini_role = _ROLE_MAP_OPENAI_TO_GEMINI.get(role, "user") parts: List[Dict[str, Any]] = [] text = _coerce_content_to_text(msg.get("content")) if text: parts.append({"text": text}) # Assistant messages can carry tool_calls tool_calls = msg.get("tool_calls") or [] if isinstance(tool_calls, list): for tc in tool_calls: if isinstance(tc, dict): parts.append(_translate_tool_call_to_gemini(tc)) if not parts: # Gemini rejects empty parts; skip the turn entirely continue contents.append({"role": gemini_role, "parts": parts}) system_instruction: Optional[Dict[str, Any]] = None joined_system = "\n".join(p for p in system_text_parts if p).strip() if joined_system: system_instruction = { "role": "system", "parts": [{"text": joined_system}], } return contents, system_instruction def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]: """OpenAI tools[] -> Gemini tools[].functionDeclarations[].""" if not isinstance(tools, list) or not tools: return [] declarations: List[Dict[str, Any]] = [] for t in tools: if not isinstance(t, dict): continue fn = t.get("function") or {} if not isinstance(fn, dict): continue name = fn.get("name") if not name: continue decl = {"name": str(name)} if fn.get("description"): decl["description"] = str(fn["description"]) params = fn.get("parameters") if isinstance(params, dict): decl["parameters"] = sanitize_gemini_tool_parameters(params) declarations.append(decl) if not declarations: return [] return [{"functionDeclarations": declarations}] def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]: """OpenAI tool_choice -> Gemini toolConfig.functionCallingConfig.""" if tool_choice is None: return None if isinstance(tool_choice, str): if tool_choice == "auto": return {"functionCallingConfig": {"mode": "AUTO"}} if tool_choice == "required": return {"functionCallingConfig": {"mode": "ANY"}} if tool_choice == "none": return {"functionCallingConfig": {"mode": "NONE"}} if isinstance(tool_choice, dict): fn = tool_choice.get("function") or {} name = fn.get("name") if name: return { "functionCallingConfig": { "mode": "ANY", "allowedFunctionNames": [str(name)], }, } return None def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]: """Accept thinkingBudget / thinkingLevel / includeThoughts (+ snake_case).""" if not isinstance(config, dict) or not config: return None budget = config.get("thinkingBudget", config.get("thinking_budget")) level = config.get("thinkingLevel", config.get("thinking_level")) include = config.get("includeThoughts", config.get("include_thoughts")) normalized: Dict[str, Any] = {} if isinstance(budget, (int, float)): normalized["thinkingBudget"] = int(budget) if isinstance(level, str) and level.strip(): normalized["thinkingLevel"] = level.strip().lower() if isinstance(include, bool): normalized["includeThoughts"] = include return normalized or None def build_gemini_request( *, messages: List[Dict[str, Any]], tools: Any = None, tool_choice: Any = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, top_p: Optional[float] = None, stop: Any = None, thinking_config: Any = None, ) -> Dict[str, Any]: """Build the inner Gemini request body (goes inside ``request`` wrapper).""" contents, system_instruction = _build_gemini_contents(messages) body: Dict[str, Any] = {"contents": contents} if system_instruction is not None: body["systemInstruction"] = system_instruction gemini_tools = _translate_tools_to_gemini(tools) if gemini_tools: body["tools"] = gemini_tools tool_cfg = _translate_tool_choice_to_gemini(tool_choice) if tool_cfg is not None: body["toolConfig"] = tool_cfg generation_config: Dict[str, Any] = {} if isinstance(temperature, (int, float)): generation_config["temperature"] = float(temperature) if isinstance(max_tokens, int) and max_tokens > 0: generation_config["maxOutputTokens"] = max_tokens if isinstance(top_p, (int, float)): generation_config["topP"] = float(top_p) if isinstance(stop, str) and stop: generation_config["stopSequences"] = [stop] elif isinstance(stop, list) and stop: generation_config["stopSequences"] = [str(s) for s in stop if s] normalized_thinking = _normalize_thinking_config(thinking_config) if normalized_thinking: generation_config["thinkingConfig"] = normalized_thinking if generation_config: body["generationConfig"] = generation_config return body def wrap_code_assist_request( *, project_id: str, model: str, inner_request: Dict[str, Any], user_prompt_id: Optional[str] = None, ) -> Dict[str, Any]: """Wrap the inner Gemini request in the Code Assist envelope.""" return { "project": project_id, "model": model, "user_prompt_id": user_prompt_id or str(uuid.uuid4()), "request": inner_request, } # ============================================================================= # Response translation: Gemini → OpenAI # ============================================================================= def _translate_gemini_response( resp: Dict[str, Any], model: str, ) -> SimpleNamespace: """Non-streaming Gemini response -> OpenAI-shaped SimpleNamespace. Code Assist wraps the actual Gemini response inside ``response``, so we unwrap it first if present. """ inner = resp.get("response") if isinstance(resp.get("response"), dict) else resp candidates = inner.get("candidates") or [] if not isinstance(candidates, list) or not candidates: return _empty_response(model) cand = candidates[0] content_obj = cand.get("content") if isinstance(cand, dict) else {} parts = content_obj.get("parts") if isinstance(content_obj, dict) else [] text_pieces: List[str] = [] reasoning_pieces: List[str] = [] tool_calls: List[SimpleNamespace] = [] for i, part in enumerate(parts or []): if not isinstance(part, dict): continue # Thought parts are model's internal reasoning — surface as reasoning, # don't mix into content. if part.get("thought") is True: if isinstance(part.get("text"), str): reasoning_pieces.append(part["text"]) continue if isinstance(part.get("text"), str): text_pieces.append(part["text"]) continue fc = part.get("functionCall") if isinstance(fc, dict) and fc.get("name"): try: args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False) except (TypeError, ValueError): args_str = "{}" tool_calls.append(SimpleNamespace( id=f"call_{uuid.uuid4().hex[:12]}", type="function", index=i, function=SimpleNamespace(name=str(fc["name"]), arguments=args_str), )) finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason( str(cand.get("finishReason") or "") ) usage_meta = inner.get("usageMetadata") or {} usage = SimpleNamespace( prompt_tokens=int(usage_meta.get("promptTokenCount") or 0), completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0), total_tokens=int(usage_meta.get("totalTokenCount") or 0), prompt_tokens_details=SimpleNamespace( cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0), ), ) message = SimpleNamespace( role="assistant", content="".join(text_pieces) if text_pieces else None, tool_calls=tool_calls or None, reasoning="".join(reasoning_pieces) or None, reasoning_content="".join(reasoning_pieces) or None, reasoning_details=None, ) choice = SimpleNamespace( index=0, message=message, finish_reason=finish_reason, ) return SimpleNamespace( id=f"chatcmpl-{uuid.uuid4().hex[:12]}", object="chat.completion", created=int(time.time()), model=model, choices=[choice], usage=usage, ) def _empty_response(model: str) -> SimpleNamespace: message = SimpleNamespace( role="assistant", content="", tool_calls=None, reasoning=None, reasoning_content=None, reasoning_details=None, ) choice = SimpleNamespace(index=0, message=message, finish_reason="stop") usage = SimpleNamespace( prompt_tokens=0, completion_tokens=0, total_tokens=0, prompt_tokens_details=SimpleNamespace(cached_tokens=0), ) return SimpleNamespace( id=f"chatcmpl-{uuid.uuid4().hex[:12]}", object="chat.completion", created=int(time.time()), model=model, choices=[choice], usage=usage, ) def _map_gemini_finish_reason(reason: str) -> str: mapping = { "STOP": "stop", "MAX_TOKENS": "length", "SAFETY": "content_filter", "RECITATION": "content_filter", "OTHER": "stop", } return mapping.get(reason.upper(), "stop") # ============================================================================= # Streaming SSE iterator # ============================================================================= class _GeminiStreamChunk(SimpleNamespace): """Mimics an OpenAI ChatCompletionChunk with .choices[0].delta.""" pass def _make_stream_chunk( *, model: str, content: str = "", tool_call_delta: Optional[Dict[str, Any]] = None, finish_reason: Optional[str] = None, reasoning: str = "", ) -> _GeminiStreamChunk: delta_kwargs: Dict[str, Any] = {"role": "assistant"} if content: delta_kwargs["content"] = content if tool_call_delta is not None: delta_kwargs["tool_calls"] = [SimpleNamespace( index=tool_call_delta.get("index", 0), id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}", type="function", function=SimpleNamespace( name=tool_call_delta.get("name") or "", arguments=tool_call_delta.get("arguments") or "", ), )] if reasoning: delta_kwargs["reasoning"] = reasoning delta_kwargs["reasoning_content"] = reasoning delta = SimpleNamespace(**delta_kwargs) choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason) return _GeminiStreamChunk( id=f"chatcmpl-{uuid.uuid4().hex[:12]}", object="chat.completion.chunk", created=int(time.time()), model=model, choices=[choice], usage=None, ) def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]: """Parse Server-Sent Events from an httpx streaming response.""" buffer = "" for chunk in response.iter_text(): if not chunk: continue buffer += chunk while "\n" in buffer: line, buffer = buffer.split("\n", 1) line = line.rstrip("\r") if not line: continue if line.startswith("data: "): data = line[6:] if data == "[DONE]": return try: yield json.loads(data) except json.JSONDecodeError: logger.debug("Non-JSON SSE line: %s", data[:200]) def _translate_stream_event( event: Dict[str, Any], model: str, tool_call_counter: List[int], ) -> List[_GeminiStreamChunk]: """Unwrap Code Assist envelope and emit OpenAI-shaped chunk(s). ``tool_call_counter`` is a single-element list used as a mutable counter across events in the same stream. Each ``functionCall`` part gets a fresh, unique OpenAI ``index`` — keying by function name would collide whenever the model issues parallel calls to the same tool (e.g. reading three files in one turn). """ inner = event.get("response") if isinstance(event.get("response"), dict) else event candidates = inner.get("candidates") or [] if not candidates: return [] cand = candidates[0] if not isinstance(cand, dict): return [] chunks: List[_GeminiStreamChunk] = [] content = cand.get("content") or {} parts = content.get("parts") if isinstance(content, dict) else [] for part in parts or []: if not isinstance(part, dict): continue if part.get("thought") is True and isinstance(part.get("text"), str): chunks.append(_make_stream_chunk( model=model, reasoning=part["text"], )) continue if isinstance(part.get("text"), str) and part["text"]: chunks.append(_make_stream_chunk(model=model, content=part["text"])) fc = part.get("functionCall") if isinstance(fc, dict) and fc.get("name"): name = str(fc["name"]) idx = tool_call_counter[0] tool_call_counter[0] += 1 try: args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False) except (TypeError, ValueError): args_str = "{}" chunks.append(_make_stream_chunk( model=model, tool_call_delta={ "index": idx, "name": name, "arguments": args_str, }, )) finish_reason_raw = str(cand.get("finishReason") or "") if finish_reason_raw: mapped = _map_gemini_finish_reason(finish_reason_raw) if tool_call_counter[0] > 0: mapped = "tool_calls" chunks.append(_make_stream_chunk(model=model, finish_reason=mapped)) return chunks # ============================================================================= # GeminiCloudCodeClient — OpenAI-compatible facade # ============================================================================= MARKER_BASE_URL = "cloudcode-pa://google" class _GeminiChatCompletions: def __init__(self, client: "GeminiCloudCodeClient"): self._client = client def create(self, **kwargs: Any) -> Any: return self._client._create_chat_completion(**kwargs) class _GeminiChatNamespace: def __init__(self, client: "GeminiCloudCodeClient"): self.completions = _GeminiChatCompletions(client) class GeminiCloudCodeClient: """Minimal OpenAI-SDK-compatible facade over Code Assist v1internal.""" def __init__( self, *, api_key: Optional[str] = None, base_url: Optional[str] = None, default_headers: Optional[Dict[str, str]] = None, project_id: str = "", **_: Any, ): # `api_key` here is a dummy — real auth is the OAuth access token # fetched on every call via agent.google_oauth.get_valid_access_token(). # We accept the kwarg for openai.OpenAI interface parity. self.api_key = api_key or "google-oauth" self.base_url = base_url or MARKER_BASE_URL self._default_headers = dict(default_headers or {}) self._configured_project_id = project_id self._project_context: Optional[ProjectContext] = None self._project_context_lock = False # simple single-thread guard self.chat = _GeminiChatNamespace(self) self.is_closed = False self._http = httpx.Client(timeout=httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0)) def close(self) -> None: self.is_closed = True try: self._http.close() except Exception: pass # Implement the OpenAI SDK's context-manager-ish closure check def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def _ensure_project_context(self, access_token: str, model: str) -> ProjectContext: """Lazily resolve and cache the project context for this client.""" if self._project_context is not None: return self._project_context env_project = google_oauth.resolve_project_id_from_env() creds = google_oauth.load_credentials() stored_project = creds.project_id if creds else "" # Prefer what's already baked into the creds if stored_project: self._project_context = ProjectContext( project_id=stored_project, managed_project_id=creds.managed_project_id if creds else "", tier_id="", source="stored", ) return self._project_context ctx = resolve_project_context( access_token, configured_project_id=self._configured_project_id, env_project_id=env_project, user_agent_model=model, ) # Persist discovered project back to the creds file so the next # session doesn't re-run the discovery. if ctx.project_id or ctx.managed_project_id: google_oauth.update_project_ids( project_id=ctx.project_id, managed_project_id=ctx.managed_project_id, ) self._project_context = ctx return ctx def _create_chat_completion( self, *, model: str = "gemini-2.5-flash", messages: Optional[List[Dict[str, Any]]] = None, stream: bool = False, tools: Any = None, tool_choice: Any = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, top_p: Optional[float] = None, stop: Any = None, extra_body: Optional[Dict[str, Any]] = None, timeout: Any = None, **_: Any, ) -> Any: access_token = google_oauth.get_valid_access_token() ctx = self._ensure_project_context(access_token, model) thinking_config = None if isinstance(extra_body, dict): thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig") inner = build_gemini_request( messages=messages or [], tools=tools, tool_choice=tool_choice, temperature=temperature, max_tokens=max_tokens, top_p=top_p, stop=stop, thinking_config=thinking_config, ) wrapped = wrap_code_assist_request( project_id=ctx.project_id, model=model, inner_request=inner, ) headers = { "Content-Type": "application/json", "Accept": "application/json", "Authorization": f"Bearer {access_token}", "User-Agent": "hermes-agent (gemini-cli-compat)", "X-Goog-Api-Client": "gl-python/hermes", "x-activity-request-id": str(uuid.uuid4()), } headers.update(self._default_headers) if stream: return self._stream_completion(model=model, wrapped=wrapped, headers=headers) url = f"{CODE_ASSIST_ENDPOINT}/v1internal:generateContent" response = self._http.post(url, json=wrapped, headers=headers) if response.status_code != 200: raise _gemini_http_error(response) try: payload = response.json() except ValueError as exc: raise CodeAssistError( f"Invalid JSON from Code Assist: {exc}", code="code_assist_invalid_json", ) from exc return _translate_gemini_response(payload, model=model) def _stream_completion( self, *, model: str, wrapped: Dict[str, Any], headers: Dict[str, str], ) -> Iterator[_GeminiStreamChunk]: """Generator that yields OpenAI-shaped streaming chunks.""" url = f"{CODE_ASSIST_ENDPOINT}/v1internal:streamGenerateContent?alt=sse" stream_headers = dict(headers) stream_headers["Accept"] = "text/event-stream" def _generator() -> Iterator[_GeminiStreamChunk]: try: with self._http.stream("POST", url, json=wrapped, headers=stream_headers) as response: if response.status_code != 200: # Materialize error body for better diagnostics response.read() raise _gemini_http_error(response) tool_call_counter: List[int] = [0] for event in _iter_sse_events(response): for chunk in _translate_stream_event(event, model, tool_call_counter): yield chunk except httpx.HTTPError as exc: raise CodeAssistError( f"Streaming request failed: {exc}", code="code_assist_stream_error", ) from exc return _generator() def _gemini_http_error(response: httpx.Response) -> CodeAssistError: """Translate an httpx response into a CodeAssistError with rich metadata. Parses Google's error envelope (``{"error": {"code", "message", "status", "details": [...]}}``) so the agent's error classifier can reason about the failure — ``status_code`` enables the rate_limit / auth classification paths, and ``response`` lets the main loop honor ``Retry-After`` just like it does for OpenAI SDK exceptions. Also lifts a few recognizable Google conditions into human-readable messages so the user sees something better than a 500-char JSON dump: MODEL_CAPACITY_EXHAUSTED → "Gemini model capacity exhausted for . This is a Google-side throttle..." RESOURCE_EXHAUSTED w/o reason → quota-style message 404 → "Model not found at cloudcode-pa..." """ status = response.status_code # Parse the body once, surviving any weird encodings. body_text = "" body_json: Dict[str, Any] = {} try: body_text = response.text except Exception: body_text = "" if body_text: try: parsed = json.loads(body_text) if isinstance(parsed, dict): body_json = parsed except (ValueError, TypeError): body_json = {} # Dig into Google's error envelope. Shape is: # {"error": {"code": 429, "message": "...", "status": "RESOURCE_EXHAUSTED", # "details": [{"@type": ".../ErrorInfo", "reason": "MODEL_CAPACITY_EXHAUSTED", # "metadata": {...}}, # {"@type": ".../RetryInfo", "retryDelay": "30s"}]}} err_obj = body_json.get("error") if isinstance(body_json, dict) else None if not isinstance(err_obj, dict): err_obj = {} err_status = str(err_obj.get("status") or "").strip() err_message = str(err_obj.get("message") or "").strip() _raw_details = err_obj.get("details") err_details_list = _raw_details if isinstance(_raw_details, list) else [] # Extract google.rpc.ErrorInfo reason + metadata. There may be more # than one ErrorInfo (rare), so we pick the first one with a reason. error_reason = "" error_metadata: Dict[str, Any] = {} retry_delay_seconds: Optional[float] = None for detail in err_details_list: if not isinstance(detail, dict): continue type_url = str(detail.get("@type") or "") if not error_reason and type_url.endswith("/google.rpc.ErrorInfo"): reason = detail.get("reason") if isinstance(reason, str) and reason: error_reason = reason md = detail.get("metadata") if isinstance(md, dict): error_metadata = md elif retry_delay_seconds is None and type_url.endswith("/google.rpc.RetryInfo"): # retryDelay is a google.protobuf.Duration string like "30s" or "1.5s". delay_raw = detail.get("retryDelay") if isinstance(delay_raw, str) and delay_raw.endswith("s"): try: retry_delay_seconds = float(delay_raw[:-1]) except ValueError: pass elif isinstance(delay_raw, (int, float)): retry_delay_seconds = float(delay_raw) # Fall back to the Retry-After header if the body didn't include RetryInfo. if retry_delay_seconds is None: try: header_val = response.headers.get("Retry-After") or response.headers.get("retry-after") except Exception: header_val = None if header_val: try: retry_delay_seconds = float(header_val) except (TypeError, ValueError): retry_delay_seconds = None # Classify the error code. ``code_assist_rate_limited`` stays the default # for 429s; a more specific reason tag helps downstream callers (e.g. tests, # logs) without changing the rate_limit classification path. code = f"code_assist_http_{status}" if status == 401: code = "code_assist_unauthorized" elif status == 429: code = "code_assist_rate_limited" if error_reason == "MODEL_CAPACITY_EXHAUSTED": code = "code_assist_capacity_exhausted" # Build a human-readable message. Keep the status + a raw-body tail for # debugging, but lead with a friendlier summary when we recognize the # Google signal. model_hint = "" if isinstance(error_metadata, dict): model_hint = str(error_metadata.get("model") or error_metadata.get("modelId") or "").strip() if status == 429 and error_reason == "MODEL_CAPACITY_EXHAUSTED": target = model_hint or "this Gemini model" message = ( f"Gemini capacity exhausted for {target} (Google-side throttle, " f"not a Hermes issue). Try a different Gemini model or set a " f"fallback_providers entry to a non-Gemini provider." ) if retry_delay_seconds is not None: message += f" Google suggests retrying in {retry_delay_seconds:g}s." elif status == 429 and err_status == "RESOURCE_EXHAUSTED": message = ( f"Gemini quota exhausted ({err_message or 'RESOURCE_EXHAUSTED'}). " f"Check /gquota for remaining daily requests." ) if retry_delay_seconds is not None: message += f" Retry suggested in {retry_delay_seconds:g}s." elif status == 404: # Google returns 404 when a model has been retired or renamed. target = model_hint or (err_message or "model") message = ( f"Code Assist 404: {target} is not available at " f"cloudcode-pa.googleapis.com. It may have been renamed or " f"retired. Check hermes_cli/models.py for the current list." ) elif err_message: # Generic fallback with the parsed message. message = f"Code Assist HTTP {status} ({err_status or 'error'}): {err_message}" else: # Last-ditch fallback — raw body snippet. message = f"Code Assist returned HTTP {status}: {body_text[:500]}" return CodeAssistError( message, code=code, status_code=status, response=response, retry_after=retry_delay_seconds, details={ "status": err_status, "reason": error_reason, "metadata": error_metadata, "message": err_message, }, )