feat(azure-foundry): add Microsoft Entra ID auth

Use azure-identity DefaultAzureCredential for keyless Foundry auth.

Preserve refreshable callable credentials through OpenAI and Anthropic client paths.

Add setup, doctor, auth status, docs, and tests for Entra auth.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
glennc 2026-05-15 14:36:18 -07:00 committed by Teknium
parent 457fa913b8
commit 9df9816dab
38 changed files with 3772 additions and 122 deletions

View file

@ -5334,7 +5334,9 @@ def get_external_process_provider_status(provider_id: str) -> Dict[str, Any]:
def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
"""Generic auth status dispatcher."""
target = provider_id or get_active_provider()
target = (provider_id or get_active_provider() or "").strip().lower()
if not target:
return {"logged_in": False}
if target == "spotify":
return get_spotify_auth_status()
if target == "nous":
@ -5351,6 +5353,8 @@ def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
return get_minimax_oauth_auth_status()
if target == "copilot-acp":
return get_external_process_provider_status(target)
if target == "azure-foundry":
return _get_azure_foundry_auth_status()
# API-key providers
pconfig = PROVIDER_REGISTRY.get(target)
if pconfig and pconfig.auth_type == "api_key":
@ -5365,6 +5369,83 @@ def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
return {"logged_in": False}
def _get_azure_foundry_auth_status() -> Dict[str, Any]:
"""Return structural auth status for Azure Foundry.
``logged_in`` is structural, matching other non-OAuth provider status
checks:
* ``auth_mode == "entra_id"`` AND ``azure-identity`` is importable
(we do NOT mint a token here; ``hermes doctor`` runs the live
probe and reports whether the credential chain can acquire one).
* ``auth_mode == "api_key"`` (default) AND ``AZURE_FOUNDRY_API_KEY``
is set with a usable value.
Never invokes the Entra credential chain keeps CLI startup latency
flat regardless of token-service / az login state.
"""
info: Dict[str, Any] = {"provider": "azure-foundry"}
try:
from hermes_cli.config import load_config, get_env_value
cfg = load_config()
except Exception:
cfg = {}
model_cfg = cfg.get("model") if isinstance(cfg, dict) else None
auth_mode = "api_key"
base_url = ""
if isinstance(model_cfg, dict):
auth_mode = str(model_cfg.get("auth_mode") or "api_key").strip().lower() or "api_key"
base_url = str(model_cfg.get("base_url") or "").strip()
info["auth_mode"] = auth_mode
info["base_url"] = base_url
if auth_mode == "entra_id":
try:
from agent.azure_identity_adapter import (
EntraIdentityConfig,
SCOPE_AI_AZURE_DEFAULT,
has_azure_identity_installed,
)
installed = has_azure_identity_installed()
entra_cfg = {}
if isinstance(model_cfg, dict) and isinstance(model_cfg.get("entra"), dict):
entra_cfg = model_cfg["entra"]
identity_config = EntraIdentityConfig.from_dict(
entra_cfg,
default_scope=SCOPE_AI_AZURE_DEFAULT,
)
info["azure_identity_installed"] = installed
info["scope"] = identity_config.scope
info["credential_probe"] = "not_run"
info["credential_verified"] = False
info["logged_in"] = bool(installed)
if not installed:
info["hint"] = (
"azure-identity not installed. Install with: "
"pip install azure-identity (or rely on Hermes' "
"lazy-install at first use)."
)
else:
info["hint"] = (
"azure-identity is installed; live credential validation "
"is skipped here. Run `hermes doctor` to verify token acquisition."
)
return info
except Exception as exc:
info["logged_in"] = False
info["error"] = f"azure-identity check failed: {exc}"
return info
# api_key mode (default)
try:
api_key = get_env_value("AZURE_FOUNDRY_API_KEY") or os.getenv("AZURE_FOUNDRY_API_KEY", "")
except Exception:
api_key = os.getenv("AZURE_FOUNDRY_API_KEY", "")
info["logged_in"] = has_usable_secret(api_key)
return info
def resolve_api_key_provider_credentials(provider_id: str) -> Dict[str, Any]:
"""Resolve API key and base URL for an API-key provider.

View file

@ -566,6 +566,54 @@ def _interactive_auth() -> None:
print()
except ImportError:
pass # boto3 or bedrock_adapter not available
# Show Azure Foundry Entra ID status
try:
from hermes_cli.config import load_config
_cfg = load_config()
_model_cfg = _cfg.get("model") if isinstance(_cfg, dict) else None
if isinstance(_model_cfg, dict):
_cfg_provider = str(_model_cfg.get("provider") or "").strip().lower()
_cfg_auth_mode = str(_model_cfg.get("auth_mode") or "").strip().lower()
if _cfg_provider == "azure-foundry" and _cfg_auth_mode == "entra_id":
from agent.azure_identity_adapter import (
EntraIdentityConfig,
SCOPE_AI_AZURE_DEFAULT,
describe_active_credential,
has_azure_identity_installed,
)
_base_url = str(_model_cfg.get("base_url") or "").strip()
_entra = _model_cfg.get("entra") or {}
if not isinstance(_entra, dict):
_entra = {}
_scope = (
str(_entra.get("scope") or "").strip()
or SCOPE_AI_AZURE_DEFAULT
)
print(f"azure-foundry (Microsoft Entra ID):")
print(f" Endpoint: {_base_url or '(not configured)'}")
print(f" Scope: {_scope}")
if not has_azure_identity_installed():
print(" Status: ⚠ azure-identity not installed "
"(pip install azure-identity)")
else:
_entra_cfg = EntraIdentityConfig(
scope=_scope,
)
_info = describe_active_credential(config=_entra_cfg, timeout_seconds=10.0)
_env_sources = _info.get("env_sources") or []
if _info.get("ok"):
_tag = ", ".join(_env_sources) if _env_sources else "default chain"
print(f" Status: ✓ token acquired ({_tag})")
else:
_err = _info.get("error") or "credential chain exhausted"
print(f" Status: ⚠ {_err}")
_hint = _info.get("hint")
if _hint:
print(f" Hint: {_hint}")
print()
except Exception:
pass
print()
# Main menu

View file

@ -1,6 +1,6 @@
"""Azure Foundry endpoint auto-detection.
Inspect an Azure AI Foundry / Azure OpenAI endpoint to determine:
Inspect a Microsoft Foundry / Azure OpenAI endpoint to determine:
- API transport (OpenAI-style ``chat_completions`` vs
Anthropic-style ``anthropic_messages``)
- Available models (best effort Azure does not expose a deployment
@ -19,6 +19,16 @@ rather than the user's *deployed* deployment names. In practice it is
still a useful hint the user picks a familiar model name and we look
up its context length from the catalog.
Authentication modes:
- ``api_key`` (default): the wizard passes an ``api_key`` string; the
probe sends both ``api-key:`` and ``Authorization: Bearer`` headers
so we hit any Azure deployment regardless of which header it expects.
- ``entra_id``: the wizard passes a ``token_provider`` callable from
:mod:`agent.azure_identity_adapter`. The probe mints exactly one
bearer JWT, sends **only** ``Authorization: Bearer <jwt>`` (never
``api-key:``), and never persists the token. This matches Microsoft's
documented contract for keyless inference.
The detector never crashes on errors (every HTTP call is wrapped in a
broad try/except). Callers get a :class:`DetectionResult` with whatever
information could be gathered, and fall back to manual entry for the
@ -31,7 +41,7 @@ import json
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
from typing import Any, Callable, Optional
from urllib import request as urllib_request
from urllib.error import HTTPError, URLError
from urllib.parse import urlparse
@ -79,15 +89,73 @@ class DetectionResult:
is_anthropic: bool = False
def _http_get_json(url: str, api_key: str, timeout: float = 6.0) -> tuple[int, Optional[dict]]:
"""GET a URL with ``api-key`` + ``Authorization`` headers. Return
def _resolve_credential(api_key: Any,
token_provider: Optional[Callable[[], str]] = None,
) -> tuple[Optional[str], str]:
"""Coerce wizard inputs into a (token, mode) pair.
Returns ``(token_or_None, mode)`` where ``mode`` is:
- ``"entra_id"`` when a callable token provider was supplied the
returned token is a freshly minted bearer JWT, sent ONLY in
``Authorization: Bearer``.
- ``"api_key"`` when a string key was supplied the returned token
is the raw API key, sent in BOTH ``api-key:`` and
``Authorization: Bearer`` headers (preserves the original
broad-compat probe behaviour).
- ``("", "api_key")`` when neither yields a value.
Bearer minting failures degrade to ``("", "entra_id")`` so the caller
can still report "detection incomplete" rather than crashing.
"""
# Token-provider path (callable wins when both supplied).
if token_provider is not None and callable(token_provider):
try:
token = token_provider()
return (str(token) if token else None), "entra_id"
except Exception as exc:
logger.debug("azure_detect: token_provider failed: %s", exc)
return None, "entra_id"
if callable(api_key) and not isinstance(api_key, str):
try:
token = api_key()
return (str(token) if token else None), "entra_id"
except Exception as exc:
logger.debug("azure_detect: api_key callable failed: %s", exc)
return None, "entra_id"
# API-key path.
if isinstance(api_key, str) and api_key:
return api_key, "api_key"
return None, "api_key"
def _apply_auth_headers(req: urllib_request.Request,
token: Optional[str],
mode: str) -> None:
"""Attach the right auth headers to ``req`` based on credential mode."""
if not token:
return
if mode == "entra_id":
# Bearer-only: do NOT also set api-key, which would log a JWT in
# a header slot intended for static keys.
req.add_header("Authorization", f"Bearer {token}")
else:
# Legacy broad-compat behaviour: send both headers so we land on
# any Azure resource regardless of which it accepts.
req.add_header("api-key", token)
req.add_header("Authorization", f"Bearer {token}")
def _http_get_json(url: str,
api_key: Any,
timeout: float = 6.0,
*,
token_provider: Optional[Callable[[], str]] = None,
) -> tuple[int, Optional[dict]]:
"""GET a URL with the appropriate auth headers. Return
``(status_code, parsed_json_or_None)``. Never raises."""
token, mode = _resolve_credential(api_key, token_provider)
req = urllib_request.Request(url, method="GET")
# Azure OpenAI uses ``api-key``. Some Azure deployments (and
# Anthropic-style routes) use ``Authorization: Bearer``. Send both
# so we probe once per URL rather than twice.
req.add_header("api-key", api_key)
req.add_header("Authorization", f"Bearer {api_key}")
_apply_auth_headers(req, token, mode)
req.add_header("User-Agent", "hermes-agent/azure-detect")
try:
with urllib_request.urlopen(req, timeout=timeout) as resp:
@ -140,7 +208,11 @@ def _extract_model_ids(payload: dict) -> list[str]:
return ids
def _probe_openai_models(base_url: str, api_key: str) -> tuple[bool, list[str]]:
def _probe_openai_models(base_url: str,
api_key: Any,
*,
token_provider: Optional[Callable[[], str]] = None,
) -> tuple[bool, list[str]]:
"""Probe ``<base>/models`` for an OpenAI-shaped response.
Returns ``(ok, models)``. ``ok`` is True iff the endpoint accepted
@ -156,7 +228,7 @@ def _probe_openai_models(base_url: str, api_key: str) -> tuple[bool, list[str]]:
candidates.append(f"{base_url}/models?api-version={v}")
for url in candidates:
status, body = _http_get_json(url, api_key)
status, body = _http_get_json(url, api_key, token_provider=token_provider)
if status == 200 and body is not None:
ids = _extract_model_ids(body)
if ids:
@ -172,7 +244,11 @@ def _probe_openai_models(base_url: str, api_key: str) -> tuple[bool, list[str]]:
return False, []
def _probe_anthropic_messages(base_url: str, api_key: str) -> bool:
def _probe_anthropic_messages(base_url: str,
api_key: Any,
*,
token_provider: Optional[Callable[[], str]] = None,
) -> bool:
"""Send a zero-token request to ``<base>/v1/messages`` and check
whether the endpoint at least *recognises* the Anthropic Messages
shape (any 4xx that mentions ``messages`` or ``model``, or a 400
@ -187,8 +263,8 @@ def _probe_anthropic_messages(base_url: str, api_key: str) -> bool:
"messages": [{"role": "user", "content": "ping"}],
}).encode("utf-8")
req = urllib_request.Request(url, method="POST", data=payload)
req.add_header("api-key", api_key)
req.add_header("Authorization", f"Bearer {api_key}")
token, mode = _resolve_credential(api_key, token_provider)
_apply_auth_headers(req, token, mode)
req.add_header("anthropic-version", "2023-06-01")
req.add_header("content-type", "application/json")
req.add_header("User-Agent", "hermes-agent/azure-detect")
@ -218,13 +294,23 @@ def _probe_anthropic_messages(base_url: str, api_key: str) -> bool:
return False
def detect(base_url: str, api_key: str) -> DetectionResult:
def detect(base_url: str,
api_key: Any = "",
*,
token_provider: Optional[Callable[[], str]] = None,
) -> DetectionResult:
"""Inspect an Azure endpoint and describe its transport + models.
Call this from the wizard before asking the user to pick an API
mode manually. The caller should treat the returned
:class:`DetectionResult` as *advisory* if ``api_mode`` is None,
fall back to asking the user.
``api_key`` may be a string (legacy API-key auth sends both
``api-key:`` and ``Authorization: Bearer``) or a callable returning
a bearer JWT (Entra ID auth sends ONLY ``Authorization: Bearer``).
``token_provider`` is an alternative explicit name for the callable
form; if both are supplied the callable wins.
"""
result = DetectionResult()
@ -244,7 +330,7 @@ def detect(base_url: str, api_key: str) -> DetectionResult:
# 2. Try the OpenAI-style /models probe. If this works, the
# endpoint definitely speaks OpenAI wire.
ok, models = _probe_openai_models(base_url, api_key)
ok, models = _probe_openai_models(base_url, api_key, token_provider=token_provider)
if ok:
result.models_probe_ok = True
result.models = models
@ -259,7 +345,7 @@ def detect(base_url: str, api_key: str) -> DetectionResult:
# 3. Fallback: probe the Anthropic Messages shape. Slower and more
# intrusive than /models, so only run it when the OpenAI probe
# failed.
if _probe_anthropic_messages(base_url, api_key):
if _probe_anthropic_messages(base_url, api_key, token_provider=token_provider):
result.is_anthropic = True
result.api_mode = "anthropic_messages"
result.reason = "Endpoint accepts Anthropic Messages shape"
@ -273,11 +359,26 @@ def detect(base_url: str, api_key: str) -> DetectionResult:
return result
def lookup_context_length(model: str, base_url: str, api_key: str) -> Optional[int]:
def lookup_context_length(model: str,
base_url: str,
api_key: Any = "",
*,
token_provider: Optional[Callable[[], str]] = None,
) -> Optional[int]:
"""Thin wrapper around :func:`agent.model_metadata.get_model_context_length`
that returns ``None`` when only the fallback default (128k) would
fire, so the wizard can distinguish "we actually know this" from
"we guessed."""
"we guessed.
For Entra-ID mode pass a callable as ``api_key`` (or via
``token_provider=``); the wrapped resolver expects a string, so we
mint one bearer JWT here for the single lookup. The resolver itself
only reads catalog metadata over HTTP no SDK client is built so
the minted token is consumed for at most one /models probe.
"""
model_id = str(model or "").strip()
if not model_id:
return None
try:
from agent.model_metadata import (
DEFAULT_FALLBACK_CONTEXT,
@ -286,8 +387,13 @@ def lookup_context_length(model: str, base_url: str, api_key: str) -> Optional[i
except Exception:
return None
# Resolve the credential once. For Entra mode this calls the token
# provider; for legacy api_key this is a no-op string pass-through.
token, mode = _resolve_credential(api_key, token_provider)
effective_key = token or ""
try:
n = get_model_context_length(model, base_url=base_url, api_key=api_key)
n = get_model_context_length(model_id, base_url=base_url, api_key=effective_key)
except Exception as exc:
logger.debug("azure_detect: context length lookup failed: %s", exc)
return None

View file

@ -1613,6 +1613,87 @@ def run_doctor(args):
f"bedrock:ListFoundationModels"],
)
def _probe_azure_entra() -> _ConnectivityResult:
"""Probe Azure Foundry Entra ID auth, parallel to ``_probe_bedrock``.
Skipped unless the active config has ``model.provider:
azure-foundry`` AND ``model.auth_mode: entra_id`` we don't probe
the token-service / CLI chain for users on plain API-key Azure.
Bounded by a 10s timeout (via
:func:`agent.azure_identity_adapter.describe_active_credential`)
so a slow token service can't pad the doctor run.
"""
label = "Azure Foundry (Entra ID)".ljust(28)
try:
from hermes_cli.config import load_config
cfg = load_config()
model_cfg = cfg.get("model") if isinstance(cfg, dict) else {}
if not isinstance(model_cfg, dict):
return _ConnectivityResult("Azure Foundry (Entra ID)", [], [])
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
auth_mode = str(model_cfg.get("auth_mode") or "").strip().lower()
if cfg_provider != "azure-foundry" or auth_mode != "entra_id":
return _ConnectivityResult("Azure Foundry (Entra ID)", [], [])
except Exception:
return _ConnectivityResult("Azure Foundry (Entra ID)", [], [])
try:
from agent.azure_identity_adapter import (
EntraIdentityConfig,
SCOPE_AI_AZURE_DEFAULT,
describe_active_credential,
has_azure_identity_installed,
)
except Exception as exc:
return _ConnectivityResult(
"Azure Foundry (Entra ID)",
[(color("", Colors.YELLOW), label,
color(f"(adapter import failed: {exc})", Colors.DIM))],
[f"Azure Foundry adapter import failed: {exc}"],
)
if not has_azure_identity_installed():
return _ConnectivityResult(
"Azure Foundry (Entra ID)",
[(color("", Colors.YELLOW), label,
color("(azure-identity not installed)", Colors.DIM))],
[f"Install azure-identity: {sys.executable} -m pip install azure-identity"],
)
base_url = str(model_cfg.get("base_url") or "").strip()
entra_cfg = model_cfg.get("entra") or {}
if not isinstance(entra_cfg, dict):
entra_cfg = {}
scope = (
str(entra_cfg.get("scope") or "").strip()
or SCOPE_AI_AZURE_DEFAULT
)
config = EntraIdentityConfig(
scope=scope,
)
info = describe_active_credential(config=config, timeout_seconds=10.0)
if info.get("ok"):
env_sources = info.get("env_sources") or []
tag = ", ".join(env_sources) if env_sources else "default credential chain"
return _ConnectivityResult(
"Azure Foundry (Entra ID)",
[(color("", Colors.GREEN), label,
color(f"({tag}, scope={scope})", Colors.DIM))],
[],
)
err = info.get("error") or "credential chain exhausted"
hint = info.get("hint") or (
"Run `az login`, set AZURE_TENANT_ID/AZURE_CLIENT_ID/"
"AZURE_CLIENT_SECRET, or attach a managed identity to this VM."
)
return _ConnectivityResult(
"Azure Foundry (Entra ID)",
[(color("", Colors.YELLOW), label,
color(f"({err})", Colors.DIM))],
[f"Azure Foundry Entra: {err}. {hint}"],
)
# Build the probe submission list in display order
_probes.append(("OpenRouter API", _probe_openrouter))
_probes.append(("Anthropic API", _probe_anthropic))
@ -1630,6 +1711,7 @@ def run_doctor(args):
_probe_apikey_provider(p, e, u, b, s)))
_probes.append(("AWS Bedrock", _probe_bedrock))
_probes.append(("Azure Foundry (Entra ID)", _probe_azure_entra))
# Print a single status line so users see something happening, then
# fan out. ``\r`` clears it once the first real result line lands.

View file

@ -3535,11 +3535,27 @@ def _save_custom_provider(
def _model_flow_azure_foundry(config, current_model=""):
"""Azure Foundry provider: configure endpoint, API mode, API key, and model.
"""Azure Foundry provider: configure endpoint, auth mode, API mode, and model.
Azure Foundry supports both OpenAI-style (``/v1/chat/completions``) and
Anthropic-style (``/v1/messages``) endpoints. The wizard auto-detects
the transport and available models when possible:
Anthropic-style (``/v1/messages``) endpoints, and two authentication
modes:
* **API key** (default) uses ``AZURE_FOUNDRY_API_KEY`` from .env.
* **Microsoft Entra ID** keyless, RBAC-based auth via the
``azure-identity`` SDK (Managed Identity / Workload Identity / az
login / VS Code / azd / service principal env vars). Works on both
OpenAI-style and Anthropic-style endpoints Microsoft RBAC is
per-resource and the same ``Azure AI User`` role grants
both. For OpenAI-style the OpenAI SDK's native callable
``api_key=`` contract is used; for Anthropic-style an
``httpx.Client`` with a request event hook (built by
:func:`agent.azure_identity_adapter.build_bearer_http_client`)
mints a fresh JWT per request because the Anthropic SDK does not
accept a callable ``auth_token`` natively.
The wizard auto-detects the transport and available models when
possible:
* URLs ending in ``/anthropic`` Anthropic Messages API.
* Successful ``GET <base>/models`` probe OpenAI-style + populates
@ -3566,9 +3582,14 @@ def _model_flow_azure_foundry(config, current_model=""):
if isinstance(model_cfg, dict) and model_cfg.get("provider") == "azure-foundry":
current_base_url = str(model_cfg.get("base_url", "") or "")
current_api_mode = str(model_cfg.get("api_mode", "") or "")
current_auth_mode = str(model_cfg.get("auth_mode") or "api_key").strip().lower() or "api_key"
_cur_entra = model_cfg.get("entra") or {}
current_entra = _cur_entra if isinstance(_cur_entra, dict) else {}
else:
current_base_url = ""
current_api_mode = ""
current_auth_mode = "api_key"
current_entra = {}
current_api_key = get_env_value("AZURE_FOUNDRY_API_KEY") or ""
@ -3583,22 +3604,29 @@ def _model_flow_azure_foundry(config, current_model=""):
print()
if current_base_url:
print(f" Current endpoint: {current_base_url}")
print(f" Current endpoint: {current_base_url}")
if current_api_mode:
_lbl = (
"OpenAI-style"
if current_api_mode == "chat_completions"
else "Anthropic-style"
)
print(f" Current API mode: {_lbl}")
if current_api_key:
print(f" Current API key: {current_api_key[:8]}...")
print(f" Current API mode: {_lbl}")
if current_auth_mode == "entra_id":
print(f" Current auth mode: Microsoft Entra ID (keyless)")
elif current_api_key:
print(f" Current auth mode: API key ({current_api_key[:8]}...)")
print()
# ── Step 1: endpoint URL ─────────────────────────────────────────
try:
_placeholder = (
current_base_url
or "e.g. https://<resource>.openai.azure.com/openai/v1 "
"or https://<resource>.services.ai.azure.com/anthropic"
)
base_url = input(
f"API endpoint URL [{current_base_url or 'e.g. https://your-resource.openai.azure.com/openai/v1'}]: "
f"API endpoint URL [{_placeholder}]: "
).strip()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
@ -3612,25 +3640,125 @@ def _model_flow_azure_foundry(config, current_model=""):
print(f"Invalid URL: {effective_url} (must start with http:// or https://)")
return
# ── Step 2: API key ──────────────────────────────────────────────
# ── Step 2: authentication mode ──────────────────────────────────
print()
print("Authentication:")
print(" 1. API key (AZURE_FOUNDRY_API_KEY in .env)")
print(" 2. Microsoft Entra ID (managed identity / workload identity / az login)")
print(" Recommended by Microsoft. Works for both OpenAI-style and Anthropic-style endpoints.")
print(" Requires the 'Azure AI User' role on the Foundry resource.")
try:
api_key = getpass.getpass(
f"API key [{current_api_key[:8] + '...' if current_api_key else 'required'}]: "
).strip()
_auth_default = "2" if current_auth_mode == "entra_id" else "1"
auth_choice = (
input(f"Authentication mode [1/2] ({_auth_default}): ").strip()
or _auth_default
)
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
use_entra = auth_choice == "2"
auth_mode_label = "entra_id" if use_entra else "api_key"
effective_key = api_key or current_api_key
if not effective_key:
print("No API key provided. Cancelled.")
return
# ── Step 3: credentials (key OR Entra preflight) ─────────────────
effective_key: str = ""
entra_overrides: dict = {}
token_provider = None # callable when entra
entra_scope = ""
# ── Step 3: auto-detect transport + models ───────────────────────
if use_entra:
try:
from agent.azure_identity_adapter import (
EntraIdentityConfig,
SCOPE_AI_AZURE_DEFAULT,
build_token_provider,
describe_active_credential,
has_azure_identity_installed,
)
except ImportError as exc:
print()
print(f"⚠ Could not import azure-identity adapter: {exc}")
print(" Falling back to API key auth.")
use_entra = False
auth_mode_label = "api_key"
if use_entra:
print()
if not has_azure_identity_installed():
print("◐ The 'azure-identity' package is not installed yet.")
print(
" Hermes will install it now (the preflight below "
"triggers the lazy-install). To skip lazy installs, "
"run: pip install azure-identity"
)
# Preserve only the optional scope override. Identity selection
# (tenant, user-assigned MI, workload identity, service principal)
# stays in Azure SDK env vars such as AZURE_CLIENT_ID.
_persisted_scope_override = str(current_entra.get("scope") or "").strip()
entra_scope = _persisted_scope_override or SCOPE_AI_AZURE_DEFAULT
entra_overrides = {}
if _persisted_scope_override:
entra_overrides["scope"] = _persisted_scope_override
print()
print("◐ Probing Microsoft Entra ID credential chain (up to 10s)...")
_config = EntraIdentityConfig(
scope=entra_scope,
)
info = describe_active_credential(config=_config, timeout_seconds=10.0)
if info.get("ok"):
env_sources = info.get("env_sources") or []
tag = ", ".join(env_sources) if env_sources else "default chain"
print(f"✓ Entra ID token acquired ({tag}, scope={entra_scope})")
else:
err = info.get("error") or "credential chain exhausted"
hint = info.get("hint") or (
"Run `az login`, attach a managed identity to this VM, or "
"set AZURE_TENANT_ID/AZURE_CLIENT_ID/AZURE_CLIENT_SECRET."
)
print(f"{err}")
print(f" Hint: {hint}")
try:
ans = input("Save Entra config anyway and validate later? [Y/n]: ").strip().lower()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
if ans and ans not in ("y", "yes"):
print("Cancelled.")
return
# Build the token provider for the detection probe (best-effort —
# if the credential chain failed above, this will silently return
# None inside azure_detect and the probe falls back to manual).
try:
token_provider = build_token_provider(config=_config)
except Exception as exc:
print(f"⚠ Could not build token provider for probing: {exc}")
token_provider = None
else:
print()
try:
api_key = getpass.getpass(
f"API key [{current_api_key[:8] + '...' if current_api_key else 'required'}]: "
).strip()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
effective_key = api_key or current_api_key
if not effective_key:
print("No API key provided. Cancelled.")
return
# ── Step 4: auto-detect transport + models ───────────────────────
print()
print("◐ Probing endpoint to auto-detect transport and models...")
detection = azure_detect.detect(effective_url, effective_key)
detection = azure_detect.detect(
effective_url,
api_key=effective_key,
token_provider=token_provider,
)
discovered_models: list[str] = list(detection.models)
api_mode: str = detection.api_mode or ""
@ -3665,7 +3793,7 @@ def _model_flow_azure_foundry(config, current_model=""):
return
api_mode = "anthropic_messages" if mode_choice == "2" else "chat_completions"
# ── Step 4: model name ───────────────────────────────────────────
# ── Step 5: model name ───────────────────────────────────────────
print()
effective_model = ""
if discovered_models:
@ -3704,15 +3832,17 @@ def _model_flow_azure_foundry(config, current_model=""):
print("No model name provided. Cancelled.")
return
# ── Step 5: context-length lookup ────────────────────────────────
# ── Step 6: context-length lookup ────────────────────────────────
ctx_len = azure_detect.lookup_context_length(
effective_model,
effective_url,
effective_key,
api_key=effective_key,
token_provider=token_provider,
)
# ── Step 6: persist ──────────────────────────────────────────────
save_env_value("AZURE_FOUNDRY_API_KEY", effective_key)
# ── Step 7: persist ──────────────────────────────────────────────
if not use_entra:
save_env_value("AZURE_FOUNDRY_API_KEY", effective_key)
cfg = load_config()
model = cfg.get("model")
@ -3724,6 +3854,22 @@ def _model_flow_azure_foundry(config, current_model=""):
model["base_url"] = effective_url
model["api_mode"] = api_mode
model["default"] = effective_model
model["auth_mode"] = auth_mode_label
if use_entra:
# Persist only the non-default Entra scope so config.yaml stays tidy.
# Azure identity selection stays in standard AZURE_* env vars.
clean_entra: dict = {}
for key in ("scope",):
val = entra_overrides.get(key)
if val:
clean_entra[key] = val
if clean_entra:
model["entra"] = clean_entra
elif "entra" in model:
del model["entra"]
else:
if "entra" in model:
del model["entra"]
if ctx_len:
model["context_length"] = ctx_len
@ -3739,10 +3885,14 @@ def _model_flow_azure_foundry(config, current_model=""):
save_env_value("OPENAI_API_KEY", "")
mode_label = "OpenAI-style" if api_mode == "chat_completions" else "Anthropic-style"
auth_label = (
"Microsoft Entra ID (keyless)" if use_entra else "API key"
)
print()
print("✓ Azure Foundry configured:")
print(f" Endpoint: {effective_url}")
print(f" API mode: {mode_label}")
print(f" Auth: {auth_label}")
print(f" Model: {effective_model}")
if ctx_len:
print(f" Context length: {ctx_len:,} tokens")

View file

@ -744,6 +744,15 @@ def _resolve_azure_foundry_runtime(
strips a trailing ``/v1`` for Anthropic-style endpoints because the
Anthropic SDK appends ``/v1/messages`` internally.
When ``model.auth_mode == "entra_id"`` (and the model is OpenAI-style),
the returned ``api_key`` is a zero-arg callable produced by
:func:`agent.azure_identity_adapter.build_token_provider` rather than
a string. Downstream code that constructs an OpenAI SDK client passes
this through unchanged (the SDK accepts ``Callable[[], str]`` for
``api_key`` and calls it before every request). Code paths that need
a string (logging, manual HTTP probes, header injection) must use the
helpers in ``agent.azure_identity_adapter``.
Raises :class:`AuthError` when required values are missing.
"""
explicit_api_key = str(explicit_api_key or "").strip()
@ -752,9 +761,15 @@ def _resolve_azure_foundry_runtime(
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = ""
cfg_api_mode = "chat_completions"
cfg_auth_mode = "api_key"
cfg_entra: Dict[str, Any] = {}
if cfg_provider == "azure-foundry":
cfg_base_url = str(model_cfg.get("base_url") or "").strip().rstrip("/")
cfg_api_mode = _parse_api_mode(model_cfg.get("api_mode")) or "chat_completions"
cfg_auth_mode = str(model_cfg.get("auth_mode") or "api_key").strip().lower() or "api_key"
_entra = model_cfg.get("entra")
if isinstance(_entra, dict):
cfg_entra = _entra
# Model-family inference: Azure Foundry deploys GPT-5.x / codex / o1-o4
# reasoning models as Responses-API-only. Calling /chat/completions
@ -780,6 +795,79 @@ def _resolve_azure_foundry_runtime(
"the AZURE_FOUNDRY_BASE_URL environment variable."
)
# Anthropic SDK appends /v1/messages itself, so strip any trailing /v1
# we inherited from the configured base_url to avoid double-/v1 paths.
if cfg_api_mode == "anthropic_messages":
base_url = re.sub(r"/v1/?$", "", base_url)
# ── Entra ID (Microsoft Foundry recommended path) ──────────────────
#
# OpenAI-style endpoints use the OpenAI SDK's native callable
# ``api_key=`` contract — the SDK mints a fresh JWT per request
# automatically.
#
# Anthropic-style endpoints (Claude on Foundry) take the callable
# too: :func:`agent.anthropic_adapter.build_anthropic_client`
# detects the callable and constructs an ``httpx.Client`` with a
# request event hook that injects a fresh ``Authorization: Bearer``
# header per request (the Anthropic SDK does not accept callables
# natively). From the runtime resolver's perspective both modes
# are identical — return the callable api_key and let the
# downstream SDK wrapper handle the contract difference.
if cfg_auth_mode == "entra_id":
if explicit_api_key:
# User passed --api-key on the CLI while config says entra_id —
# honour the explicit string (escape hatch for one-off testing).
api_key: Any = explicit_api_key
source = "explicit"
auth_mode = "api_key"
else:
try:
from agent.azure_identity_adapter import (
EntraIdentityConfig,
SCOPE_AI_AZURE_DEFAULT,
build_token_provider,
)
except Exception as exc:
raise AuthError(
"Azure Foundry Entra ID auth requires the 'azure-identity' "
"package. Install it with: pip install azure-identity "
f"(import failed: {exc})"
) from exc
scope = (
str(cfg_entra.get("scope") or "").strip()
or SCOPE_AI_AZURE_DEFAULT
)
try:
entra_config = EntraIdentityConfig(
scope=scope,
)
token_provider = build_token_provider(config=entra_config)
except ImportError as exc:
raise AuthError(str(exc)) from exc
api_key = token_provider
source = "entra_id"
auth_mode = "entra_id"
clean_entra = {}
if auth_mode == "entra_id":
configured_scope = str(cfg_entra.get("scope") or "").strip()
if configured_scope:
clean_entra["scope"] = configured_scope
return {
"provider": "azure-foundry",
"api_mode": cfg_api_mode,
"base_url": base_url,
"api_key": api_key,
"auth_mode": auth_mode,
"entra": clean_entra,
"source": source,
"requested_provider": requested_provider,
}
# ── Static API key (legacy / default) ──────────────────────────────
api_key = explicit_api_key
if not api_key:
try:
@ -792,20 +880,19 @@ def _resolve_azure_foundry_runtime(
if not api_key:
raise AuthError(
"Azure Foundry requires an API key. Set AZURE_FOUNDRY_API_KEY in "
"~/.hermes/.env or run 'hermes model' to configure."
"~/.hermes/.env or run 'hermes model' to configure. To use "
"keyless Microsoft Entra ID auth instead, set "
"model.auth_mode: entra_id in config.yaml (or pick "
"'Microsoft Entra ID' in 'hermes model')."
)
# Anthropic SDK appends /v1/messages itself, so strip any trailing /v1
# we inherited from the configured base_url to avoid double-/v1 paths.
if cfg_api_mode == "anthropic_messages":
base_url = re.sub(r"/v1/?$", "", base_url)
source = "explicit" if (explicit_api_key or explicit_base_url) else "config"
return {
"provider": "azure-foundry",
"api_mode": cfg_api_mode,
"base_url": base_url,
"api_key": api_key,
"auth_mode": "api_key",
"source": source,
"requested_provider": requested_provider,
}
@ -1232,7 +1319,7 @@ def resolve_runtime_provider(
cfg_base_url = (model_cfg.get("base_url") or "").strip().rstrip("/")
base_url = cfg_base_url or "https://api.anthropic.com"
# For Azure AI Foundry endpoints, use ANTHROPIC_API_KEY directly —
# For Microsoft Foundry endpoints, use ANTHROPIC_API_KEY directly —
# Claude Code OAuth tokens (sk-ant-oat01) are not accepted by Azure.
# Azure keys don't start with "sk-ant-" so resolve_anthropic_token()
# would find the Claude Code OAuth token first (priority 3) and return

View file

@ -1288,9 +1288,15 @@ def _truncate_token(value: Optional[str], visible: int = 6) -> str:
OAuth access token. JWT prefixes (the part before the first dot) are
stripped first when present so the visible suffix is always part of
the signing region rather than a meaningless header chunk.
Returns the Entra-ID placeholder when handed a callable (Azure Foundry
bearer provider) the callable is NEVER invoked here.
"""
if not value:
return ""
if callable(value) and not isinstance(value, str):
# Entra ID bearer provider — never reveal a minted token in the UI.
return "<entra-id-bearer>"
s = str(value)
if "." in s and s.count(".") >= 2:
# Looks like a JWT — show the trailing piece of the signature only.