mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
1394 lines
53 KiB
Python
1394 lines
53 KiB
Python
"""Shared model-switching logic for CLI and gateway /model commands.
|
|
|
|
Both the CLI (cli.py) and gateway (gateway/run.py) /model handlers
|
|
share the same core pipeline:
|
|
|
|
parse flags -> alias resolution -> provider resolution ->
|
|
credential resolution -> normalize model name ->
|
|
metadata lookup -> build result
|
|
|
|
This module ties together the foundation layers:
|
|
|
|
- ``agent.models_dev`` -- models.dev catalog, ModelInfo, ProviderInfo
|
|
- ``hermes_cli.providers`` -- canonical provider identity + overlays
|
|
- ``hermes_cli.model_normalize`` -- per-provider name formatting
|
|
|
|
Provider switching uses the ``--provider`` flag exclusively.
|
|
No colon-based ``provider:model`` syntax — colons are reserved for
|
|
OpenRouter variant suffixes (``:free``, ``:extended``, ``:fast``).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import List, NamedTuple, Optional
|
|
|
|
from hermes_cli.providers import (
|
|
custom_provider_slug,
|
|
determine_api_mode,
|
|
get_label,
|
|
is_aggregator,
|
|
resolve_provider_full,
|
|
)
|
|
from hermes_cli.model_normalize import (
|
|
normalize_model_for_provider,
|
|
)
|
|
from agent.models_dev import (
|
|
ModelCapabilities,
|
|
ModelInfo,
|
|
get_model_capabilities,
|
|
get_model_info,
|
|
list_provider_models,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Non-agentic model warning
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_HERMES_MODEL_WARNING = (
|
|
"Nous Research Hermes 3 & 4 models are NOT agentic and are not designed "
|
|
"for use with Hermes Agent. They lack the tool-calling capabilities "
|
|
"required for agent workflows. Consider using an agentic model instead "
|
|
"(Claude, GPT, Gemini, DeepSeek, etc.)."
|
|
)
|
|
|
|
# Match only the real Nous Research Hermes 3 / Hermes 4 chat families.
|
|
# The previous substring check (`"hermes" in name.lower()`) false-positived on
|
|
# unrelated local Modelfiles like ``hermes-brain:qwen3-14b-ctx16k`` that just
|
|
# happen to carry "hermes" in their tag but are fully tool-capable.
|
|
#
|
|
# Positive examples the regex must match:
|
|
# NousResearch/Hermes-3-Llama-3.1-70B, hermes-4-405b, openrouter/hermes3:70b
|
|
# Negative examples it must NOT match:
|
|
# hermes-brain:qwen3-14b-ctx16k, qwen3:14b, claude-opus-4-6
|
|
_NOUS_HERMES_NON_AGENTIC_RE = re.compile(
|
|
r"(?:^|[/:])hermes[-_ ]?[34](?:[-_.:]|$)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def is_nous_hermes_non_agentic(model_name: str) -> bool:
|
|
"""Return True if *model_name* is a real Nous Hermes 3/4 chat model.
|
|
|
|
Used to decide whether to surface the non-agentic warning at startup.
|
|
Callers in :mod:`cli.py` and here should go through this single helper
|
|
so the two sites don't drift.
|
|
"""
|
|
if not model_name:
|
|
return False
|
|
return bool(_NOUS_HERMES_NON_AGENTIC_RE.search(model_name))
|
|
|
|
|
|
def _check_hermes_model_warning(model_name: str) -> str:
|
|
"""Return a warning string if *model_name* is a Nous Hermes 3/4 chat model."""
|
|
if is_nous_hermes_non_agentic(model_name):
|
|
return _HERMES_MODEL_WARNING
|
|
return ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Model aliases -- short names -> (vendor, family) with NO version numbers.
|
|
# Resolved dynamically against the live models.dev catalog.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class ModelIdentity(NamedTuple):
|
|
"""Vendor slug and family prefix used for catalog resolution."""
|
|
vendor: str
|
|
family: str
|
|
|
|
|
|
MODEL_ALIASES: dict[str, ModelIdentity] = {
|
|
# Anthropic
|
|
"sonnet": ModelIdentity("anthropic", "claude-sonnet"),
|
|
"opus": ModelIdentity("anthropic", "claude-opus"),
|
|
"haiku": ModelIdentity("anthropic", "claude-haiku"),
|
|
"claude": ModelIdentity("anthropic", "claude"),
|
|
|
|
# OpenAI
|
|
"gpt5": ModelIdentity("openai", "gpt-5"),
|
|
"gpt": ModelIdentity("openai", "gpt"),
|
|
"codex": ModelIdentity("openai", "codex"),
|
|
"o3": ModelIdentity("openai", "o3"),
|
|
"o4": ModelIdentity("openai", "o4"),
|
|
|
|
# Google
|
|
"gemini": ModelIdentity("google", "gemini"),
|
|
|
|
# DeepSeek
|
|
"deepseek": ModelIdentity("deepseek", "deepseek-chat"),
|
|
|
|
# X.AI
|
|
"grok": ModelIdentity("x-ai", "grok"),
|
|
|
|
# Meta
|
|
"llama": ModelIdentity("meta-llama", "llama"),
|
|
|
|
# Qwen / Alibaba
|
|
"qwen": ModelIdentity("qwen", "qwen"),
|
|
|
|
# MiniMax
|
|
"minimax": ModelIdentity("minimax", "minimax"),
|
|
|
|
# Nvidia
|
|
"nemotron": ModelIdentity("nvidia", "nemotron"),
|
|
|
|
# Moonshot / Kimi
|
|
"kimi": ModelIdentity("moonshotai", "kimi"),
|
|
|
|
# Z.AI / GLM
|
|
"glm": ModelIdentity("z-ai", "glm"),
|
|
|
|
# Step Plan (StepFun)
|
|
"step": ModelIdentity("stepfun", "step"),
|
|
|
|
# Xiaomi
|
|
"mimo": ModelIdentity("xiaomi", "mimo"),
|
|
|
|
# Arcee
|
|
"trinity": ModelIdentity("arcee-ai", "trinity"),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Direct aliases — exact model+provider+base_url for endpoints that aren't
|
|
# in the models.dev catalog (e.g. Ollama Cloud, local servers).
|
|
# Checked BEFORE catalog resolution. Format:
|
|
# alias -> (model_id, provider, base_url)
|
|
# These can also be loaded from config.yaml ``model_aliases:`` section.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class DirectAlias(NamedTuple):
|
|
"""Exact model mapping that bypasses catalog resolution."""
|
|
model: str
|
|
provider: str
|
|
base_url: str
|
|
|
|
|
|
# Built-in direct aliases (can be extended via config.yaml model_aliases:)
|
|
_BUILTIN_DIRECT_ALIASES: dict[str, DirectAlias] = {}
|
|
|
|
# Merged dict (builtins + user config); populated by _load_direct_aliases()
|
|
DIRECT_ALIASES: dict[str, DirectAlias] = {}
|
|
|
|
|
|
def _load_direct_aliases() -> dict[str, DirectAlias]:
|
|
"""Load direct aliases from config.yaml ``model_aliases:`` section.
|
|
|
|
Config format::
|
|
|
|
model_aliases:
|
|
qwen:
|
|
model: "qwen3.5:397b"
|
|
provider: custom
|
|
base_url: "https://ollama.com/v1"
|
|
minimax:
|
|
model: "minimax-m2.7"
|
|
provider: custom
|
|
base_url: "https://ollama.com/v1"
|
|
"""
|
|
merged = dict(_BUILTIN_DIRECT_ALIASES)
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
cfg = load_config()
|
|
user_aliases = cfg.get("model_aliases")
|
|
if isinstance(user_aliases, dict):
|
|
for name, entry in user_aliases.items():
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
model = entry.get("model", "")
|
|
provider = entry.get("provider", "custom")
|
|
base_url = entry.get("base_url", "")
|
|
if model:
|
|
merged[name.strip().lower()] = DirectAlias(
|
|
model=model, provider=provider, base_url=base_url,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return merged
|
|
|
|
|
|
def _ensure_direct_aliases() -> None:
|
|
"""Lazy-load direct aliases on first use."""
|
|
global DIRECT_ALIASES
|
|
if not DIRECT_ALIASES:
|
|
DIRECT_ALIASES = _load_direct_aliases()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Result dataclasses
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class ModelSwitchResult:
|
|
"""Result of a model switch attempt."""
|
|
|
|
success: bool
|
|
new_model: str = ""
|
|
target_provider: str = ""
|
|
provider_changed: bool = False
|
|
api_key: str = ""
|
|
base_url: str = ""
|
|
api_mode: str = ""
|
|
error_message: str = ""
|
|
warning_message: str = ""
|
|
provider_label: str = ""
|
|
resolved_via_alias: str = ""
|
|
capabilities: Optional[ModelCapabilities] = None
|
|
model_info: Optional[ModelInfo] = None
|
|
is_global: bool = False
|
|
|
|
|
|
@dataclass
|
|
class CustomAutoResult:
|
|
"""Result of switching to bare 'custom' provider with auto-detect."""
|
|
|
|
success: bool
|
|
model: str = ""
|
|
base_url: str = ""
|
|
api_key: str = ""
|
|
error_message: str = ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Flag parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def parse_model_flags(raw_args: str) -> tuple[str, str, bool]:
|
|
"""Parse --provider and --global flags from /model command args.
|
|
|
|
Returns (model_input, explicit_provider, is_global).
|
|
|
|
Examples::
|
|
|
|
"sonnet" -> ("sonnet", "", False)
|
|
"sonnet --global" -> ("sonnet", "", True)
|
|
"sonnet --provider anthropic" -> ("sonnet", "anthropic", False)
|
|
"--provider my-ollama" -> ("", "my-ollama", False)
|
|
"sonnet --provider anthropic --global" -> ("sonnet", "anthropic", True)
|
|
"""
|
|
is_global = False
|
|
explicit_provider = ""
|
|
|
|
# Normalize Unicode dashes (Telegram/iOS auto-converts -- to em/en dash)
|
|
# A single Unicode dash before a flag keyword becomes "--"
|
|
import re as _re
|
|
raw_args = _re.sub(r'[\u2012\u2013\u2014\u2015](provider|global)', r'--\1', raw_args)
|
|
|
|
# Extract --global
|
|
if "--global" in raw_args:
|
|
is_global = True
|
|
raw_args = raw_args.replace("--global", "").strip()
|
|
|
|
# Extract --provider <name>
|
|
parts = raw_args.split()
|
|
i = 0
|
|
filtered: list[str] = []
|
|
while i < len(parts):
|
|
if parts[i] == "--provider" and i + 1 < len(parts):
|
|
explicit_provider = parts[i + 1]
|
|
i += 2
|
|
else:
|
|
filtered.append(parts[i])
|
|
i += 1
|
|
|
|
model_input = " ".join(filtered).strip()
|
|
return (model_input, explicit_provider, is_global)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Alias resolution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _model_sort_key(model_id: str, prefix: str) -> tuple:
|
|
"""Sort key for model version preference.
|
|
|
|
Extracts version numbers after the family prefix and returns a sort key
|
|
that prefers higher versions. Suffix tokens (``pro``, ``omni``, etc.)
|
|
are used as tiebreakers, with common quality indicators ranked.
|
|
|
|
Examples (with prefix ``"mimo"``)::
|
|
|
|
mimo-v2.5-pro → (-2.5, 0, 'pro') # highest version wins
|
|
mimo-v2.5 → (-2.5, 1, '') # no suffix = lower than pro
|
|
mimo-v2-pro → (-2.0, 0, 'pro')
|
|
mimo-v2-omni → (-2.0, 1, 'omni')
|
|
mimo-v2-flash → (-2.0, 1, 'flash')
|
|
"""
|
|
# Strip the prefix (and optional "/" separator for aggregator slugs)
|
|
rest = model_id[len(prefix):]
|
|
if rest.startswith("/"):
|
|
rest = rest[1:]
|
|
rest = rest.lstrip("-").strip()
|
|
|
|
# Parse version and suffix from the remainder.
|
|
# "v2.5-pro" → version [2.5], suffix "pro"
|
|
# "-omni" → version [], suffix "omni"
|
|
# State machine: start → in_version → between → in_suffix
|
|
nums: list[float] = []
|
|
suffix_buf = ""
|
|
state = "start"
|
|
num_buf = ""
|
|
|
|
for ch in rest:
|
|
if state == "start":
|
|
if ch in "vV":
|
|
state = "in_version"
|
|
elif ch.isdigit():
|
|
state = "in_version"
|
|
num_buf += ch
|
|
elif ch in "-_.":
|
|
pass # skip separators before any content
|
|
else:
|
|
state = "in_suffix"
|
|
suffix_buf += ch
|
|
elif state == "in_version":
|
|
if ch.isdigit():
|
|
num_buf += ch
|
|
elif ch == ".":
|
|
if "." in num_buf:
|
|
# Second dot — flush current number, start new component
|
|
try:
|
|
nums.append(float(num_buf.rstrip(".")))
|
|
except ValueError:
|
|
pass
|
|
num_buf = ""
|
|
else:
|
|
num_buf += ch
|
|
elif ch in "-_.":
|
|
if num_buf:
|
|
try:
|
|
nums.append(float(num_buf.rstrip(".")))
|
|
except ValueError:
|
|
pass
|
|
num_buf = ""
|
|
state = "between"
|
|
else:
|
|
if num_buf:
|
|
try:
|
|
nums.append(float(num_buf.rstrip(".")))
|
|
except ValueError:
|
|
pass
|
|
num_buf = ""
|
|
state = "in_suffix"
|
|
suffix_buf += ch
|
|
elif state == "between":
|
|
if ch.isdigit():
|
|
state = "in_version"
|
|
num_buf = ch
|
|
elif ch in "vV":
|
|
state = "in_version"
|
|
elif ch in "-_.":
|
|
pass
|
|
else:
|
|
state = "in_suffix"
|
|
suffix_buf += ch
|
|
elif state == "in_suffix":
|
|
suffix_buf += ch
|
|
|
|
# Flush remaining buffer (strip trailing dots — "5.4." → "5.4")
|
|
if num_buf and state == "in_version":
|
|
try:
|
|
nums.append(float(num_buf.rstrip(".")))
|
|
except ValueError:
|
|
pass
|
|
|
|
suffix = suffix_buf.lower().strip("-_.")
|
|
suffix = suffix.strip()
|
|
|
|
# Negate versions so higher → sorts first
|
|
version_key = tuple(-n for n in nums)
|
|
|
|
# Suffix quality ranking: pro/max > (no suffix) > omni/flash/mini/lite
|
|
# Lower number = preferred
|
|
_SUFFIX_RANK = {"pro": 0, "max": 0, "plus": 0, "turbo": 0}
|
|
suffix_rank = _SUFFIX_RANK.get(suffix, 1)
|
|
|
|
return version_key + (suffix_rank, suffix)
|
|
|
|
|
|
def resolve_alias(
|
|
raw_input: str,
|
|
current_provider: str,
|
|
) -> Optional[tuple[str, str, str]]:
|
|
"""Resolve a short alias against the current provider's catalog.
|
|
|
|
Looks up *raw_input* in :data:`MODEL_ALIASES`, then searches the
|
|
current provider's models.dev catalog for the model whose ID starts
|
|
with ``vendor/family`` (or just ``family`` for non-aggregator
|
|
providers) and has the **highest version**.
|
|
|
|
Returns:
|
|
``(provider, resolved_model_id, alias_name)`` if a match is
|
|
found on the current provider, or ``None`` if the alias doesn't
|
|
exist or no matching model is available.
|
|
"""
|
|
key = raw_input.strip().lower()
|
|
|
|
# Check direct aliases first (exact model+provider+base_url mappings)
|
|
_ensure_direct_aliases()
|
|
direct = DIRECT_ALIASES.get(key)
|
|
if direct is not None:
|
|
return (direct.provider, direct.model, key)
|
|
|
|
# Reverse lookup: match by model ID so full names (e.g. "kimi-k2.5",
|
|
# "glm-4.7") route through direct aliases instead of falling through
|
|
# to the catalog/OpenRouter.
|
|
for alias_name, da in DIRECT_ALIASES.items():
|
|
if da.model.lower() == key:
|
|
return (da.provider, da.model, alias_name)
|
|
|
|
identity = MODEL_ALIASES.get(key)
|
|
if identity is None:
|
|
return None
|
|
|
|
vendor, family = identity
|
|
|
|
# Build catalog from models.dev, then merge in static _PROVIDER_MODELS
|
|
# entries that models.dev may be missing (e.g. newly added models not
|
|
# yet synced to the registry).
|
|
catalog = list_provider_models(current_provider)
|
|
try:
|
|
from hermes_cli.models import _PROVIDER_MODELS
|
|
static = _PROVIDER_MODELS.get(current_provider, [])
|
|
if static:
|
|
seen = {m.lower() for m in catalog}
|
|
for m in static:
|
|
if m.lower() not in seen:
|
|
catalog.append(m)
|
|
except Exception:
|
|
pass
|
|
|
|
# For aggregators, models are vendor/model-name format
|
|
aggregator = is_aggregator(current_provider)
|
|
|
|
if aggregator:
|
|
prefix = f"{vendor}/{family}".lower()
|
|
matches = [
|
|
mid for mid in catalog
|
|
if mid.lower().startswith(prefix)
|
|
]
|
|
else:
|
|
family_lower = family.lower()
|
|
matches = [
|
|
mid for mid in catalog
|
|
if mid.lower().startswith(family_lower)
|
|
]
|
|
|
|
if not matches:
|
|
return None
|
|
|
|
# Sort by version descending — prefer the latest/highest version
|
|
prefix_for_sort = f"{vendor}/{family}" if aggregator else family
|
|
matches.sort(key=lambda m: _model_sort_key(m, prefix_for_sort))
|
|
return (current_provider, matches[0], key)
|
|
|
|
|
|
def get_authenticated_provider_slugs(
|
|
current_provider: str = "",
|
|
user_providers: dict = None,
|
|
custom_providers: list | None = None,
|
|
) -> list[str]:
|
|
"""Return slugs of providers that have credentials.
|
|
|
|
Uses ``list_authenticated_providers()`` which is backed by the models.dev
|
|
in-memory cache (1 hr TTL) — no extra network cost.
|
|
"""
|
|
try:
|
|
providers = list_authenticated_providers(
|
|
current_provider=current_provider,
|
|
user_providers=user_providers,
|
|
custom_providers=custom_providers,
|
|
max_models=0,
|
|
)
|
|
return [p["slug"] for p in providers]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _resolve_alias_fallback(
|
|
raw_input: str,
|
|
authenticated_providers: list[str] = (),
|
|
) -> Optional[tuple[str, str, str]]:
|
|
"""Try to resolve an alias on the user's authenticated providers.
|
|
|
|
Falls back to ``("openrouter", "nous")`` only when no authenticated
|
|
providers are supplied (backwards compat for non-interactive callers).
|
|
"""
|
|
providers = authenticated_providers or ("openrouter", "nous")
|
|
for provider in providers:
|
|
result = resolve_alias(raw_input, provider)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core model-switching pipeline
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def switch_model(
|
|
raw_input: str,
|
|
current_provider: str,
|
|
current_model: str,
|
|
current_base_url: str = "",
|
|
current_api_key: str = "",
|
|
is_global: bool = False,
|
|
explicit_provider: str = "",
|
|
user_providers: dict = None,
|
|
custom_providers: list | None = None,
|
|
) -> ModelSwitchResult:
|
|
"""Core model-switching pipeline shared between CLI and gateway.
|
|
|
|
Resolution chain:
|
|
|
|
If --provider given:
|
|
a. Resolve provider via resolve_provider_full()
|
|
b. Resolve credentials
|
|
c. If model given, resolve alias on target provider or use as-is
|
|
d. If no model, auto-detect from endpoint
|
|
|
|
If no --provider:
|
|
a. Try alias resolution on current provider
|
|
b. If alias exists but not on current provider -> fallback
|
|
c. On aggregator, try vendor/model slug conversion
|
|
d. Aggregator catalog search
|
|
e. detect_provider_for_model() as last resort
|
|
f. Resolve credentials
|
|
g. Normalize model name for target provider
|
|
|
|
Finally:
|
|
h. Get full model metadata from models.dev
|
|
i. Build result
|
|
|
|
Args:
|
|
raw_input: The model name (after flag parsing).
|
|
current_provider: The currently active provider.
|
|
current_model: The currently active model name.
|
|
current_base_url: The currently active base URL.
|
|
current_api_key: The currently active API key.
|
|
is_global: Whether to persist the switch.
|
|
explicit_provider: From --provider flag (empty = no explicit provider).
|
|
user_providers: The ``providers:`` dict from config.yaml (for user endpoints).
|
|
custom_providers: The ``custom_providers:`` list from config.yaml.
|
|
|
|
Returns:
|
|
ModelSwitchResult with all information the caller needs.
|
|
"""
|
|
from hermes_cli.models import (
|
|
copilot_model_api_mode,
|
|
detect_provider_for_model,
|
|
validate_requested_model,
|
|
opencode_model_api_mode,
|
|
)
|
|
from hermes_cli.runtime_provider import resolve_runtime_provider
|
|
|
|
resolved_alias = ""
|
|
new_model = raw_input.strip()
|
|
target_provider = current_provider
|
|
|
|
# =================================================================
|
|
# PATH A: Explicit --provider given
|
|
# =================================================================
|
|
if explicit_provider:
|
|
# Resolve the provider
|
|
pdef = resolve_provider_full(
|
|
explicit_provider,
|
|
user_providers,
|
|
custom_providers,
|
|
)
|
|
if pdef is None:
|
|
_switch_err = (
|
|
f"Unknown provider '{explicit_provider}'. "
|
|
f"Check 'hermes model' for available providers, or define it "
|
|
f"in config.yaml under 'providers:'."
|
|
)
|
|
# Check for common config issues that cause provider resolution failures
|
|
try:
|
|
from hermes_cli.config import validate_config_structure
|
|
_cfg_issues = validate_config_structure()
|
|
if _cfg_issues:
|
|
_switch_err += "\n\nRun 'hermes doctor' — config issues detected:"
|
|
for _ci in _cfg_issues[:3]:
|
|
_switch_err += f"\n • {_ci.message}"
|
|
except Exception:
|
|
pass
|
|
return ModelSwitchResult(
|
|
success=False,
|
|
is_global=is_global,
|
|
error_message=_switch_err,
|
|
)
|
|
|
|
target_provider = pdef.id
|
|
|
|
# If no model specified, try auto-detect from endpoint
|
|
if not new_model:
|
|
if pdef.base_url:
|
|
from hermes_cli.runtime_provider import _auto_detect_local_model
|
|
detected = _auto_detect_local_model(pdef.base_url)
|
|
if detected:
|
|
new_model = detected
|
|
else:
|
|
return ModelSwitchResult(
|
|
success=False,
|
|
target_provider=target_provider,
|
|
provider_label=pdef.name,
|
|
is_global=is_global,
|
|
error_message=(
|
|
f"No model detected on {pdef.name} ({pdef.base_url}). "
|
|
f"Specify the model explicitly: /model <model-name> --provider {explicit_provider}"
|
|
),
|
|
)
|
|
else:
|
|
return ModelSwitchResult(
|
|
success=False,
|
|
target_provider=target_provider,
|
|
provider_label=pdef.name,
|
|
is_global=is_global,
|
|
error_message=(
|
|
f"Provider '{pdef.name}' has no base URL configured. "
|
|
f"Specify a model: /model <model-name> --provider {explicit_provider}"
|
|
),
|
|
)
|
|
|
|
# Resolve alias on the TARGET provider
|
|
alias_result = resolve_alias(new_model, target_provider)
|
|
if alias_result is not None:
|
|
_, new_model, resolved_alias = alias_result
|
|
|
|
# =================================================================
|
|
# PATH B: No explicit provider — resolve from model input
|
|
# =================================================================
|
|
else:
|
|
# --- Step a: Try alias resolution on current provider ---
|
|
alias_result = resolve_alias(raw_input, current_provider)
|
|
|
|
if alias_result is not None:
|
|
target_provider, new_model, resolved_alias = alias_result
|
|
logger.debug(
|
|
"Alias '%s' resolved to %s on %s",
|
|
resolved_alias, new_model, target_provider,
|
|
)
|
|
else:
|
|
# --- Step b: Alias exists but not on current provider -> fallback ---
|
|
key = raw_input.strip().lower()
|
|
if key in MODEL_ALIASES:
|
|
authed = get_authenticated_provider_slugs(
|
|
current_provider=current_provider,
|
|
user_providers=user_providers,
|
|
custom_providers=custom_providers,
|
|
)
|
|
fallback_result = _resolve_alias_fallback(raw_input, authed)
|
|
if fallback_result is not None:
|
|
target_provider, new_model, resolved_alias = fallback_result
|
|
logger.debug(
|
|
"Alias '%s' resolved via fallback to %s on %s",
|
|
resolved_alias, new_model, target_provider,
|
|
)
|
|
else:
|
|
identity = MODEL_ALIASES[key]
|
|
return ModelSwitchResult(
|
|
success=False,
|
|
is_global=is_global,
|
|
error_message=(
|
|
f"Alias '{key}' maps to {identity.vendor}/{identity.family} "
|
|
f"but no matching model was found in any provider catalog. "
|
|
f"Try specifying the full model name."
|
|
),
|
|
)
|
|
else:
|
|
# --- Step c: On aggregator, convert vendor:model to vendor/model ---
|
|
# Only convert when there's no slash — a slash means the name
|
|
# is already in vendor/model format and the colon is a variant
|
|
# tag (:free, :extended, :fast) that must be preserved.
|
|
colon_pos = raw_input.find(":")
|
|
if colon_pos > 0 and "/" not in raw_input and is_aggregator(current_provider):
|
|
left = raw_input[:colon_pos].strip().lower()
|
|
right = raw_input[colon_pos + 1:].strip()
|
|
if left and right:
|
|
# Colons become slashes for aggregator slugs
|
|
new_model = f"{left}/{right}"
|
|
logger.debug(
|
|
"Converted vendor:model '%s' to aggregator slug '%s'",
|
|
raw_input, new_model,
|
|
)
|
|
|
|
# --- Step d: Aggregator catalog search ---
|
|
if is_aggregator(target_provider) and not resolved_alias:
|
|
catalog = list_provider_models(target_provider)
|
|
if catalog:
|
|
new_model_lower = new_model.lower()
|
|
for mid in catalog:
|
|
if mid.lower() == new_model_lower:
|
|
new_model = mid
|
|
break
|
|
else:
|
|
for mid in catalog:
|
|
if "/" in mid:
|
|
_, bare = mid.split("/", 1)
|
|
if bare.lower() == new_model_lower:
|
|
new_model = mid
|
|
break
|
|
|
|
# --- Step e: detect_provider_for_model() as last resort ---
|
|
_base = current_base_url or ""
|
|
is_custom = current_provider in ("custom", "local") or (
|
|
"localhost" in _base or "127.0.0.1" in _base
|
|
)
|
|
|
|
if (
|
|
target_provider == current_provider
|
|
and not is_custom
|
|
and not resolved_alias
|
|
):
|
|
detected = detect_provider_for_model(new_model, current_provider)
|
|
if detected:
|
|
target_provider, new_model = detected
|
|
|
|
# =================================================================
|
|
# COMMON PATH: Resolve credentials, normalize, get metadata
|
|
# =================================================================
|
|
|
|
provider_changed = target_provider != current_provider
|
|
provider_label = get_label(target_provider)
|
|
if target_provider.startswith("custom:"):
|
|
custom_pdef = resolve_provider_full(
|
|
target_provider,
|
|
user_providers,
|
|
custom_providers,
|
|
)
|
|
if custom_pdef is not None:
|
|
provider_label = custom_pdef.name
|
|
|
|
# --- Resolve credentials ---
|
|
api_key = current_api_key
|
|
base_url = current_base_url
|
|
api_mode = ""
|
|
|
|
if provider_changed or explicit_provider:
|
|
try:
|
|
runtime = resolve_runtime_provider(
|
|
requested=target_provider,
|
|
target_model=new_model,
|
|
)
|
|
api_key = runtime.get("api_key", "")
|
|
base_url = runtime.get("base_url", "")
|
|
api_mode = runtime.get("api_mode", "")
|
|
except Exception as e:
|
|
return ModelSwitchResult(
|
|
success=False,
|
|
target_provider=target_provider,
|
|
provider_label=provider_label,
|
|
is_global=is_global,
|
|
error_message=(
|
|
f"Could not resolve credentials for provider "
|
|
f"'{provider_label}': {e}"
|
|
),
|
|
)
|
|
else:
|
|
try:
|
|
runtime = resolve_runtime_provider(
|
|
requested=current_provider,
|
|
target_model=new_model,
|
|
)
|
|
api_key = runtime.get("api_key", "")
|
|
base_url = runtime.get("base_url", "")
|
|
api_mode = runtime.get("api_mode", "")
|
|
except Exception:
|
|
pass
|
|
|
|
# --- Direct alias override: use exact base_url from the alias if set ---
|
|
if resolved_alias:
|
|
_ensure_direct_aliases()
|
|
_da = DIRECT_ALIASES.get(resolved_alias)
|
|
if _da is not None and _da.base_url:
|
|
base_url = _da.base_url
|
|
api_mode = "" # clear so determine_api_mode re-detects from URL
|
|
if not api_key:
|
|
api_key = "no-key-required"
|
|
|
|
# --- Normalize model name for target provider ---
|
|
new_model = normalize_model_for_provider(new_model, target_provider)
|
|
|
|
# --- Validate ---
|
|
try:
|
|
validation = validate_requested_model(
|
|
new_model,
|
|
target_provider,
|
|
api_key=api_key,
|
|
base_url=base_url,
|
|
)
|
|
except Exception as e:
|
|
validation = {
|
|
"accepted": False,
|
|
"persist": False,
|
|
"recognized": False,
|
|
"message": f"Could not validate `{new_model}`: {e}",
|
|
}
|
|
|
|
if not validation.get("accepted"):
|
|
msg = validation.get("message", "Invalid model")
|
|
return ModelSwitchResult(
|
|
success=False,
|
|
new_model=new_model,
|
|
target_provider=target_provider,
|
|
provider_label=provider_label,
|
|
is_global=is_global,
|
|
error_message=msg,
|
|
)
|
|
|
|
# Apply auto-correction if validation found a closer match
|
|
if validation.get("corrected_model"):
|
|
new_model = validation["corrected_model"]
|
|
|
|
# --- Copilot api_mode override ---
|
|
if target_provider in {"copilot", "github-copilot"}:
|
|
api_mode = copilot_model_api_mode(new_model, api_key=api_key)
|
|
|
|
# --- OpenCode api_mode override ---
|
|
if target_provider in {"opencode-zen", "opencode-go", "opencode"}:
|
|
api_mode = opencode_model_api_mode(target_provider, new_model)
|
|
|
|
# --- Determine api_mode if not already set ---
|
|
if not api_mode:
|
|
api_mode = determine_api_mode(target_provider, base_url)
|
|
|
|
# OpenCode base URLs end with /v1 for OpenAI-compatible models, but the
|
|
# Anthropic SDK prepends its own /v1/messages to the base_url. Strip the
|
|
# trailing /v1 so the SDK constructs the correct path (e.g.
|
|
# https://opencode.ai/zen/go/v1/messages instead of .../v1/v1/messages).
|
|
# Mirrors the same logic in hermes_cli.runtime_provider.resolve_runtime_provider;
|
|
# without it, /model switches into an anthropic_messages-routed OpenCode
|
|
# model (e.g. `/model minimax-m2.7` on opencode-go, `/model claude-sonnet-4-6`
|
|
# on opencode-zen) hit a double /v1 and returned OpenCode's website 404 page.
|
|
if (
|
|
api_mode == "anthropic_messages"
|
|
and target_provider in {"opencode-zen", "opencode-go"}
|
|
and isinstance(base_url, str)
|
|
and base_url
|
|
):
|
|
base_url = re.sub(r"/v1/?$", "", base_url)
|
|
|
|
# --- Get capabilities (legacy) ---
|
|
capabilities = get_model_capabilities(target_provider, new_model)
|
|
|
|
# --- Get full model info from models.dev ---
|
|
model_info = get_model_info(target_provider, new_model)
|
|
|
|
# --- Collect warnings ---
|
|
warnings: list[str] = []
|
|
if validation.get("message"):
|
|
warnings.append(validation["message"])
|
|
hermes_warn = _check_hermes_model_warning(new_model)
|
|
if hermes_warn:
|
|
warnings.append(hermes_warn)
|
|
|
|
# --- Build result ---
|
|
return ModelSwitchResult(
|
|
success=True,
|
|
new_model=new_model,
|
|
target_provider=target_provider,
|
|
provider_changed=provider_changed,
|
|
api_key=api_key,
|
|
base_url=base_url,
|
|
api_mode=api_mode,
|
|
warning_message=" | ".join(warnings) if warnings else "",
|
|
provider_label=provider_label,
|
|
resolved_via_alias=resolved_alias,
|
|
capabilities=capabilities,
|
|
model_info=model_info,
|
|
is_global=is_global,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Authenticated providers listing (for /model no-args display)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def list_authenticated_providers(
|
|
current_provider: str = "",
|
|
current_base_url: str = "",
|
|
user_providers: dict = None,
|
|
custom_providers: list | None = None,
|
|
max_models: int = 8,
|
|
) -> List[dict]:
|
|
"""Detect which providers have credentials and list their curated models.
|
|
|
|
Uses the curated model lists from hermes_cli/models.py (OPENROUTER_MODELS,
|
|
_PROVIDER_MODELS) — NOT the full models.dev catalog. These are hand-picked
|
|
agentic models that work well as agent backends.
|
|
|
|
Returns a list of dicts, each with:
|
|
- slug: str — the --provider value to use
|
|
- name: str — display name
|
|
- is_current: bool
|
|
- is_user_defined: bool
|
|
- models: list[str] — curated model IDs (up to max_models)
|
|
- total_models: int — total curated count
|
|
- source: str — "built-in", "models.dev", "user-config"
|
|
|
|
Only includes providers that have API keys set or are user-defined endpoints.
|
|
"""
|
|
import os
|
|
from agent.models_dev import (
|
|
PROVIDER_TO_MODELS_DEV,
|
|
fetch_models_dev,
|
|
get_provider_info as _mdev_pinfo,
|
|
)
|
|
from hermes_cli.auth import PROVIDER_REGISTRY
|
|
from hermes_cli.models import (
|
|
OPENROUTER_MODELS, _PROVIDER_MODELS,
|
|
_MODELS_DEV_PREFERRED, _merge_with_models_dev, provider_model_ids,
|
|
)
|
|
|
|
results: List[dict] = []
|
|
seen_slugs: set = set() # lowercase-normalized to catch case variants (#9545)
|
|
seen_mdev_ids: set = set() # prevent duplicate entries for aliases (e.g. kimi-coding + kimi-coding-cn)
|
|
|
|
data = fetch_models_dev()
|
|
|
|
# Build curated model lists keyed by hermes provider ID
|
|
curated: dict[str, list[str]] = dict(_PROVIDER_MODELS)
|
|
curated["openrouter"] = [mid for mid, _ in OPENROUTER_MODELS]
|
|
# "nous" shares OpenRouter's curated list if not separately defined
|
|
if "nous" not in curated:
|
|
curated["nous"] = curated["openrouter"]
|
|
# Ollama Cloud uses dynamic discovery (no static curated list)
|
|
if "ollama-cloud" not in curated:
|
|
from hermes_cli.models import fetch_ollama_cloud_models
|
|
curated["ollama-cloud"] = fetch_ollama_cloud_models()
|
|
|
|
# --- 1. Check Hermes-mapped providers ---
|
|
for hermes_id, mdev_id in PROVIDER_TO_MODELS_DEV.items():
|
|
# Skip aliases that map to the same models.dev provider (e.g.
|
|
# kimi-coding and kimi-coding-cn both → kimi-for-coding).
|
|
# The first one with valid credentials wins (#10526).
|
|
if mdev_id in seen_mdev_ids:
|
|
continue
|
|
pdata = data.get(mdev_id)
|
|
if not isinstance(pdata, dict):
|
|
continue
|
|
|
|
# Prefer auth.py PROVIDER_REGISTRY for env var names — it's our
|
|
# source of truth. models.dev can have wrong mappings (e.g.
|
|
# minimax-cn → MINIMAX_API_KEY instead of MINIMAX_CN_API_KEY).
|
|
pconfig = PROVIDER_REGISTRY.get(hermes_id)
|
|
# Skip non-API-key auth providers here — they are handled in
|
|
# section 2 (HERMES_OVERLAYS) with proper auth store checking.
|
|
if pconfig and pconfig.auth_type != "api_key":
|
|
continue
|
|
if pconfig and pconfig.api_key_env_vars:
|
|
env_vars = list(pconfig.api_key_env_vars)
|
|
else:
|
|
env_vars = pdata.get("env", [])
|
|
if not isinstance(env_vars, list):
|
|
continue
|
|
|
|
# Check if any env var is set
|
|
has_creds = any(os.environ.get(ev) for ev in env_vars)
|
|
if not has_creds:
|
|
try:
|
|
from hermes_cli.auth import _load_auth_store
|
|
store = _load_auth_store()
|
|
if store and hermes_id in store.get("credential_pool", {}):
|
|
has_creds = True
|
|
except Exception:
|
|
pass
|
|
if not has_creds:
|
|
continue
|
|
|
|
# Use curated list, falling back to models.dev if no curated list.
|
|
# For preferred providers, merge models.dev entries into the curated
|
|
# catalog so newly released models (e.g. mimo-v2.5-pro on opencode-go)
|
|
# show up in the picker without requiring a Hermes release.
|
|
model_ids = curated.get(hermes_id, [])
|
|
if hermes_id in _MODELS_DEV_PREFERRED:
|
|
model_ids = _merge_with_models_dev(hermes_id, model_ids)
|
|
total = len(model_ids)
|
|
top = model_ids[:max_models]
|
|
|
|
slug = hermes_id
|
|
pinfo = _mdev_pinfo(mdev_id)
|
|
display_name = pinfo.name if pinfo else mdev_id
|
|
|
|
results.append({
|
|
"slug": slug,
|
|
"name": display_name,
|
|
"is_current": slug == current_provider or mdev_id == current_provider,
|
|
"is_user_defined": False,
|
|
"models": top,
|
|
"total_models": total,
|
|
"source": "built-in",
|
|
})
|
|
seen_slugs.add(slug.lower())
|
|
seen_mdev_ids.add(mdev_id)
|
|
|
|
# --- 2. Check Hermes-only providers (nous, openai-codex, copilot, opencode-go) ---
|
|
from hermes_cli.providers import HERMES_OVERLAYS
|
|
from hermes_cli.auth import PROVIDER_REGISTRY as _auth_registry
|
|
|
|
# Build reverse mapping: models.dev ID → Hermes provider ID.
|
|
# HERMES_OVERLAYS keys may be models.dev IDs (e.g. "github-copilot")
|
|
# while _PROVIDER_MODELS and config.yaml use Hermes IDs ("copilot").
|
|
_mdev_to_hermes = {v: k for k, v in PROVIDER_TO_MODELS_DEV.items()}
|
|
|
|
for pid, overlay in HERMES_OVERLAYS.items():
|
|
if pid.lower() in seen_slugs:
|
|
continue
|
|
|
|
# Resolve Hermes slug — e.g. "github-copilot" → "copilot"
|
|
hermes_slug = _mdev_to_hermes.get(pid, pid)
|
|
if hermes_slug.lower() in seen_slugs:
|
|
continue
|
|
|
|
# Check if credentials exist
|
|
has_creds = False
|
|
if overlay.extra_env_vars:
|
|
has_creds = any(os.environ.get(ev) for ev in overlay.extra_env_vars)
|
|
# Also check api_key_env_vars from PROVIDER_REGISTRY for api_key auth_type
|
|
if not has_creds and overlay.auth_type == "api_key":
|
|
for _key in (pid, hermes_slug):
|
|
pcfg = _auth_registry.get(_key)
|
|
if pcfg and pcfg.api_key_env_vars:
|
|
if any(os.environ.get(ev) for ev in pcfg.api_key_env_vars):
|
|
has_creds = True
|
|
break
|
|
# Check auth store and credential pool for non-env-var credentials.
|
|
# This applies to OAuth providers AND api_key providers that also
|
|
# support OAuth (e.g. anthropic supports both API key and Claude Code
|
|
# OAuth via external credential files).
|
|
if not has_creds:
|
|
try:
|
|
from hermes_cli.auth import _load_auth_store
|
|
store = _load_auth_store()
|
|
providers_store = store.get("providers", {})
|
|
pool_store = store.get("credential_pool", {})
|
|
if store and (
|
|
pid in providers_store or hermes_slug in providers_store
|
|
or pid in pool_store or hermes_slug in pool_store
|
|
):
|
|
has_creds = True
|
|
except Exception as exc:
|
|
logger.debug("Auth store check failed for %s: %s", pid, exc)
|
|
# Fallback: check the credential pool with full auto-seeding.
|
|
# This catches credentials that exist in external stores (e.g.
|
|
# Codex CLI ~/.codex/auth.json) which _seed_from_singletons()
|
|
# imports on demand but aren't in the raw auth.json yet.
|
|
if not has_creds:
|
|
try:
|
|
from agent.credential_pool import load_pool
|
|
pool = load_pool(hermes_slug)
|
|
if pool.has_credentials():
|
|
has_creds = True
|
|
except Exception as exc:
|
|
logger.debug("Credential pool check failed for %s: %s", hermes_slug, exc)
|
|
# Fallback: check external credential files directly.
|
|
# The credential pool gates anthropic behind
|
|
# is_provider_explicitly_configured() to prevent auxiliary tasks
|
|
# from silently consuming Claude Code tokens (PR #4210).
|
|
# But the /model picker is discovery-oriented — we WANT to show
|
|
# providers the user can switch to, even if they aren't currently
|
|
# configured.
|
|
if not has_creds and hermes_slug == "anthropic":
|
|
try:
|
|
from agent.anthropic_adapter import (
|
|
read_claude_code_credentials,
|
|
read_hermes_oauth_credentials,
|
|
)
|
|
hermes_creds = read_hermes_oauth_credentials()
|
|
cc_creds = read_claude_code_credentials()
|
|
if (hermes_creds and hermes_creds.get("accessToken")) or \
|
|
(cc_creds and cc_creds.get("accessToken")):
|
|
has_creds = True
|
|
except Exception as exc:
|
|
logger.debug("Anthropic external creds check failed: %s", exc)
|
|
if not has_creds:
|
|
continue
|
|
|
|
if hermes_slug in {"copilot", "copilot-acp"}:
|
|
model_ids = provider_model_ids(hermes_slug)
|
|
else:
|
|
# Use curated list — look up by Hermes slug, fall back to overlay key
|
|
model_ids = curated.get(hermes_slug, []) or curated.get(pid, [])
|
|
# Merge with models.dev for preferred providers (same rationale as above).
|
|
if hermes_slug in _MODELS_DEV_PREFERRED:
|
|
model_ids = _merge_with_models_dev(hermes_slug, model_ids)
|
|
total = len(model_ids)
|
|
top = model_ids[:max_models]
|
|
|
|
results.append({
|
|
"slug": hermes_slug,
|
|
"name": get_label(hermes_slug),
|
|
"is_current": hermes_slug == current_provider or pid == current_provider,
|
|
"is_user_defined": False,
|
|
"models": top,
|
|
"total_models": total,
|
|
"source": "hermes",
|
|
})
|
|
seen_slugs.add(pid.lower())
|
|
seen_slugs.add(hermes_slug.lower())
|
|
|
|
# --- 2b. Cross-check canonical provider list ---
|
|
# Catches providers that are in CANONICAL_PROVIDERS but weren't found
|
|
# in PROVIDER_TO_MODELS_DEV or HERMES_OVERLAYS (keeps /model in sync
|
|
# with `hermes model`).
|
|
try:
|
|
from hermes_cli.models import CANONICAL_PROVIDERS as _canon_provs
|
|
except ImportError:
|
|
_canon_provs = []
|
|
|
|
for _cp in _canon_provs:
|
|
if _cp.slug.lower() in seen_slugs:
|
|
continue
|
|
|
|
# Check credentials via PROVIDER_REGISTRY (auth.py)
|
|
_cp_config = _auth_registry.get(_cp.slug)
|
|
_cp_has_creds = False
|
|
if _cp_config and _cp_config.api_key_env_vars:
|
|
_cp_has_creds = any(os.environ.get(ev) for ev in _cp_config.api_key_env_vars)
|
|
# Also check auth store and credential pool
|
|
if not _cp_has_creds:
|
|
try:
|
|
from hermes_cli.auth import _load_auth_store
|
|
_cp_store = _load_auth_store()
|
|
_cp_providers_store = _cp_store.get("providers", {})
|
|
_cp_pool_store = _cp_store.get("credential_pool", {})
|
|
if _cp_store and (
|
|
_cp.slug in _cp_providers_store
|
|
or _cp.slug in _cp_pool_store
|
|
):
|
|
_cp_has_creds = True
|
|
except Exception:
|
|
pass
|
|
if not _cp_has_creds:
|
|
try:
|
|
from agent.credential_pool import load_pool
|
|
_cp_pool = load_pool(_cp.slug)
|
|
if _cp_pool.has_credentials():
|
|
_cp_has_creds = True
|
|
except Exception:
|
|
pass
|
|
|
|
if not _cp_has_creds:
|
|
continue
|
|
|
|
_cp_model_ids = curated.get(_cp.slug, [])
|
|
_cp_total = len(_cp_model_ids)
|
|
_cp_top = _cp_model_ids[:max_models]
|
|
|
|
results.append({
|
|
"slug": _cp.slug,
|
|
"name": _cp.label,
|
|
"is_current": _cp.slug == current_provider,
|
|
"is_user_defined": False,
|
|
"models": _cp_top,
|
|
"total_models": _cp_total,
|
|
"source": "canonical",
|
|
})
|
|
seen_slugs.add(_cp.slug.lower())
|
|
|
|
# --- 3. User-defined endpoints from config ---
|
|
# Track (name, base_url) of what section 3 emits so section 4 can skip
|
|
# any overlapping ``custom_providers:`` entries. Callers typically pass
|
|
# both (gateway/CLI invoke ``get_compatible_custom_providers()`` which
|
|
# merges ``providers:`` into the list) — without this, the same endpoint
|
|
# produces two picker rows: one bare-slug ("openrouter") from section 3
|
|
# and one "custom:openrouter" from section 4, both labelled identically.
|
|
_section3_emitted_pairs: set = set()
|
|
if user_providers and isinstance(user_providers, dict):
|
|
for ep_name, ep_cfg in user_providers.items():
|
|
if not isinstance(ep_cfg, dict):
|
|
continue
|
|
# Skip if this slug was already emitted (e.g. canonical provider
|
|
# with the same name) or will be picked up by section 4.
|
|
if ep_name.lower() in seen_slugs:
|
|
continue
|
|
display_name = ep_cfg.get("name", "") or ep_name
|
|
# ``base_url`` is Hermes's canonical write key (matches
|
|
# custom_providers and _save_custom_provider); ``api`` / ``url``
|
|
# remain as fallbacks for hand-edited / legacy configs.
|
|
api_url = (
|
|
ep_cfg.get("base_url", "")
|
|
or ep_cfg.get("api", "")
|
|
or ep_cfg.get("url", "")
|
|
or ""
|
|
)
|
|
# ``default_model`` is the legacy key; ``model`` matches what
|
|
# custom_providers entries use, so accept either.
|
|
default_model = ep_cfg.get("default_model", "") or ep_cfg.get("model", "")
|
|
|
|
# Build models list from both default_model and full models array
|
|
models_list = []
|
|
if default_model:
|
|
models_list.append(default_model)
|
|
# Also include the full models list from config.
|
|
# Hermes writes ``models:`` as a dict keyed by model id
|
|
# (see hermes_cli/main.py::_save_custom_provider); older
|
|
# configs or hand-edited files may still use a list.
|
|
cfg_models = ep_cfg.get("models", [])
|
|
if isinstance(cfg_models, dict):
|
|
for m in cfg_models:
|
|
if m and m not in models_list:
|
|
models_list.append(m)
|
|
elif isinstance(cfg_models, list):
|
|
for m in cfg_models:
|
|
if m and m not in models_list:
|
|
models_list.append(m)
|
|
|
|
# Try to probe /v1/models if URL is set (but don't block on it)
|
|
# For now just show what we know from config
|
|
results.append({
|
|
"slug": ep_name,
|
|
"name": display_name,
|
|
"is_current": ep_name == current_provider,
|
|
"is_user_defined": True,
|
|
"models": models_list,
|
|
"total_models": len(models_list) if models_list else 0,
|
|
"source": "user-config",
|
|
"api_url": api_url,
|
|
})
|
|
seen_slugs.add(ep_name.lower())
|
|
seen_slugs.add(custom_provider_slug(display_name).lower())
|
|
_pair = (
|
|
str(display_name).strip().lower(),
|
|
str(api_url).strip().rstrip("/").lower(),
|
|
)
|
|
if _pair[0] and _pair[1]:
|
|
_section3_emitted_pairs.add(_pair)
|
|
|
|
# --- 4. Saved custom providers from config ---
|
|
# Each ``custom_providers`` entry represents one model under a named
|
|
# provider. Entries sharing the same endpoint (``base_url`` + ``api_key``)
|
|
# are grouped into a single picker row, so e.g. four Ollama entries
|
|
# pointing at ``http://localhost:11434/v1`` with per-model display names
|
|
# ("Ollama — GLM 5.1", "Ollama — Qwen3-coder", ...) appear as one
|
|
# "Ollama" row with four models inside instead of four near-duplicates
|
|
# that differ only by suffix. Entries with distinct endpoints still
|
|
# produce separate rows.
|
|
#
|
|
# When the grouped endpoint matches ``current_base_url`` the group's
|
|
# slug becomes ``current_provider`` so that selecting a model from the
|
|
# picker flows back through the runtime provider that already holds
|
|
# valid credentials — no re-resolution needed.
|
|
if custom_providers and isinstance(custom_providers, list):
|
|
from collections import OrderedDict
|
|
|
|
# Key by (base_url, api_key) instead of slug: names frequently
|
|
# differ per model ("Ollama — X") while the endpoint stays the
|
|
# same. Slug-based grouping left them as separate rows.
|
|
groups: "OrderedDict[tuple, dict]" = OrderedDict()
|
|
for entry in custom_providers:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
|
|
raw_name = (entry.get("name") or "").strip()
|
|
api_url = (
|
|
entry.get("base_url", "")
|
|
or entry.get("url", "")
|
|
or entry.get("api", "")
|
|
or ""
|
|
).strip().rstrip("/")
|
|
if not raw_name or not api_url:
|
|
continue
|
|
api_key = (entry.get("api_key") or "").strip()
|
|
|
|
group_key = (api_url, api_key)
|
|
if group_key not in groups:
|
|
# Strip per-model suffix so "Ollama — GLM 5.1" becomes
|
|
# "Ollama" for the grouped row. Em dash is the convention
|
|
# Hermes's own writer uses; a hyphen variant is accepted
|
|
# for hand-edited configs.
|
|
display_name = raw_name
|
|
for sep in ("—", " - "):
|
|
if sep in display_name:
|
|
display_name = display_name.split(sep)[0].strip()
|
|
break
|
|
if not display_name:
|
|
display_name = raw_name
|
|
# If this endpoint matches the currently active one, use
|
|
# ``current_provider`` as the slug so picker-driven switches
|
|
# route through the live credential pipeline.
|
|
if (
|
|
current_base_url
|
|
and api_url == current_base_url.strip().rstrip("/")
|
|
):
|
|
slug = current_provider or custom_provider_slug(display_name)
|
|
else:
|
|
slug = custom_provider_slug(display_name)
|
|
groups[group_key] = {
|
|
"slug": slug,
|
|
"name": display_name,
|
|
"api_url": api_url,
|
|
"models": [],
|
|
}
|
|
|
|
# The singular ``model:`` field only holds the currently
|
|
# active model. Hermes's own writer (main.py::_save_custom_provider)
|
|
# stores every configured model as a dict under ``models:``;
|
|
# downstream readers (agent/models_dev.py, gateway/run.py,
|
|
# run_agent.py, hermes_cli/config.py) already consume that dict.
|
|
default_model = (entry.get("model") or "").strip()
|
|
if default_model and default_model not in groups[group_key]["models"]:
|
|
groups[group_key]["models"].append(default_model)
|
|
|
|
cfg_models = entry.get("models", {})
|
|
if isinstance(cfg_models, dict):
|
|
for m in cfg_models:
|
|
if m and m not in groups[group_key]["models"]:
|
|
groups[group_key]["models"].append(m)
|
|
elif isinstance(cfg_models, list):
|
|
for m in cfg_models:
|
|
if m and m not in groups[group_key]["models"]:
|
|
groups[group_key]["models"].append(m)
|
|
|
|
_section4_emitted_slugs: set = set()
|
|
for grp in groups.values():
|
|
slug = grp["slug"]
|
|
# If the slug is already claimed by a built-in / overlay /
|
|
# user-provider row (sections 1-3), skip this custom group
|
|
# to avoid shadowing a real provider.
|
|
if slug.lower() in seen_slugs and slug.lower() not in _section4_emitted_slugs:
|
|
continue
|
|
# If a prior section-4 group already used this slug (two custom
|
|
# endpoints with the same cleaned name — e.g. two OpenAI-
|
|
# compatible gateways named identically with different keys),
|
|
# append a counter so both rows stay visible in the picker.
|
|
if slug.lower() in _section4_emitted_slugs:
|
|
base_slug = slug
|
|
n = 2
|
|
while f"{base_slug}-{n}".lower() in seen_slugs:
|
|
n += 1
|
|
slug = f"{base_slug}-{n}"
|
|
grp["slug"] = slug
|
|
# Skip if section 3 already emitted this endpoint under its
|
|
# ``providers:`` dict key — matches on (display_name, base_url).
|
|
# Prevents two picker rows labelled identically when callers
|
|
# pass both ``user_providers`` and a compatibility-merged
|
|
# ``custom_providers`` list.
|
|
_pair_key = (
|
|
str(grp["name"]).strip().lower(),
|
|
str(grp["api_url"]).strip().rstrip("/").lower(),
|
|
)
|
|
if _pair_key[0] and _pair_key[1] and _pair_key in _section3_emitted_pairs:
|
|
continue
|
|
results.append({
|
|
"slug": slug,
|
|
"name": grp["name"],
|
|
"is_current": slug == current_provider,
|
|
"is_user_defined": True,
|
|
"models": grp["models"],
|
|
"total_models": len(grp["models"]),
|
|
"source": "user-config",
|
|
"api_url": grp["api_url"],
|
|
})
|
|
seen_slugs.add(slug.lower())
|
|
_section4_emitted_slugs.add(slug.lower())
|
|
|
|
# Sort: current provider first, then by model count descending
|
|
results.sort(key=lambda r: (not r["is_current"], -r["total_models"]))
|
|
|
|
return results
|