fix(auth): unify credential source removal — every source sticks (#13427)

Every credential source Hermes reads from now behaves identically on
`hermes auth remove`: the pool entry stays gone across fresh load_pool()
calls, even when the underlying external state (env var, OAuth file,
auth.json block, config entry) is still present.

Before this, auth_remove_command was a 110-line if/elif with five
special cases, and three more sources (qwen-cli, copilot, custom
config) had no removal handler at all — their pool entries silently
resurrected on the next invocation.  Even the handled cases diverged:
codex suppressed, anthropic deleted-without-suppressing, nous cleared
without suppressing.  Each new provider added a new gap.

What's new:
  agent/credential_sources.py — RemovalStep registry, one entry per
  source (env, claude_code, hermes_pkce, nous device_code, codex
  device_code, qwen-cli, copilot gh_cli + env vars, custom config).
  auth_remove_command dispatches uniformly via find_removal_step().

Changes elsewhere:
  agent/credential_pool.py — every upsert in _seed_from_env,
  _seed_from_singletons, and _seed_custom_pool now gates on
  is_source_suppressed(provider, source) via a shared helper.
  hermes_cli/auth_commands.py — auth_remove_command reduced to 25
  lines of dispatch; auth_add_command now clears ALL suppressions for
  the provider on re-add (was env:* only).

Copilot is special: the same token is seeded twice (gh_cli via
_seed_from_singletons + env:<VAR> via _seed_from_env), so removing one
entry without suppressing the other variants lets the duplicate
resurrect.  The copilot RemovalStep suppresses gh_cli + all three env
variants (COPILOT_GITHUB_TOKEN, GH_TOKEN, GITHUB_TOKEN) at once.

Tests: 11 new unit tests + 4059 existing pass.  12 E2E scenarios cover
every source in isolated HERMES_HOME with simulated fresh processes.
This commit is contained in:
Teknium 2026-04-21 01:52:49 -07:00 committed by GitHub
parent e0dc0a88d3
commit 2c69b3eca8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 793 additions and 179 deletions

View file

@ -983,6 +983,14 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
active_sources: Set[str] = set() active_sources: Set[str] = set()
auth_store = _load_auth_store() auth_store = _load_auth_store()
# Shared suppression gate — used at every upsert site so
# `hermes auth remove <provider> <N>` is stable across all source types.
try:
from hermes_cli.auth import is_source_suppressed as _is_suppressed
except ImportError:
def _is_suppressed(_p, _s): # type: ignore[misc]
return False
if provider == "anthropic": if provider == "anthropic":
# Only auto-discover external credentials (Claude Code, Hermes PKCE) # Only auto-discover external credentials (Claude Code, Hermes PKCE)
# when the user has explicitly configured anthropic as their provider. # when the user has explicitly configured anthropic as their provider.
@ -1002,13 +1010,8 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
("claude_code", read_claude_code_credentials()), ("claude_code", read_claude_code_credentials()),
): ):
if creds and creds.get("accessToken"): if creds and creds.get("accessToken"):
# Check if user explicitly removed this source if _is_suppressed(provider, source_name):
try:
from hermes_cli.auth import is_source_suppressed
if is_source_suppressed(provider, source_name):
continue continue
except ImportError:
pass
active_sources.add(source_name) active_sources.add(source_name)
changed |= _upsert_entry( changed |= _upsert_entry(
entries, entries,
@ -1026,7 +1029,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
elif provider == "nous": elif provider == "nous":
state = _load_provider_state(auth_store, "nous") state = _load_provider_state(auth_store, "nous")
if state: if state and not _is_suppressed(provider, "device_code"):
active_sources.add("device_code") active_sources.add("device_code")
# Prefer a user-supplied label embedded in the singleton state # Prefer a user-supplied label embedded in the singleton state
# (set by persist_nous_credentials(label=...) when the user ran # (set by persist_nous_credentials(label=...) when the user ran
@ -1067,6 +1070,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
token, source = resolve_copilot_token() token, source = resolve_copilot_token()
if token: if token:
source_name = "gh_cli" if "gh" in source.lower() else f"env:{source}" source_name = "gh_cli" if "gh" in source.lower() else f"env:{source}"
if not _is_suppressed(provider, source_name):
active_sources.add(source_name) active_sources.add(source_name)
pconfig = PROVIDER_REGISTRY.get(provider) pconfig = PROVIDER_REGISTRY.get(provider)
changed |= _upsert_entry( changed |= _upsert_entry(
@ -1096,6 +1100,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
token = creds.get("api_key", "") token = creds.get("api_key", "")
if token: if token:
source_name = creds.get("source", "qwen-cli") source_name = creds.get("source", "qwen-cli")
if not _is_suppressed(provider, source_name):
active_sources.add(source_name) active_sources.add(source_name)
changed |= _upsert_entry( changed |= _upsert_entry(
entries, entries,
@ -1118,13 +1123,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
# the device_code source as suppressed so it won't be re-seeded from # the device_code source as suppressed so it won't be re-seeded from
# the Hermes auth store. Without this gate the removal is instantly # the Hermes auth store. Without this gate the removal is instantly
# undone on the next load_pool() call. # undone on the next load_pool() call.
codex_suppressed = False if _is_suppressed(provider, "device_code"):
try:
from hermes_cli.auth import is_source_suppressed
codex_suppressed = is_source_suppressed(provider, "device_code")
except ImportError:
pass
if codex_suppressed:
return changed, active_sources return changed, active_sources
state = _load_provider_state(auth_store, "openai-codex") state = _load_provider_state(auth_store, "openai-codex")
@ -1256,6 +1255,13 @@ def _seed_custom_pool(pool_key: str, entries: List[PooledCredential]) -> Tuple[b
changed = False changed = False
active_sources: Set[str] = set() active_sources: Set[str] = set()
# Shared suppression gate — same pattern as _seed_from_env/_seed_from_singletons.
try:
from hermes_cli.auth import is_source_suppressed as _is_suppressed
except ImportError:
def _is_suppressed(_p, _s): # type: ignore[misc]
return False
# Seed from the custom_providers config entry's api_key field # Seed from the custom_providers config entry's api_key field
cp_config = _get_custom_provider_config(pool_key) cp_config = _get_custom_provider_config(pool_key)
if cp_config: if cp_config:
@ -1264,6 +1270,7 @@ def _seed_custom_pool(pool_key: str, entries: List[PooledCredential]) -> Tuple[b
name = str(cp_config.get("name") or "").strip() name = str(cp_config.get("name") or "").strip()
if api_key: if api_key:
source = f"config:{name}" source = f"config:{name}"
if not _is_suppressed(pool_key, source):
active_sources.add(source) active_sources.add(source)
changed |= _upsert_entry( changed |= _upsert_entry(
entries, entries,
@ -1296,6 +1303,7 @@ def _seed_custom_pool(pool_key: str, entries: List[PooledCredential]) -> Tuple[b
matched_key = get_custom_provider_pool_key(model_base_url) matched_key = get_custom_provider_pool_key(model_base_url)
if matched_key == pool_key: if matched_key == pool_key:
source = "model_config" source = "model_config"
if not _is_suppressed(pool_key, source):
active_sources.add(source) active_sources.add(source)
changed |= _upsert_entry( changed |= _upsert_entry(
entries, entries,

401
agent/credential_sources.py Normal file
View file

@ -0,0 +1,401 @@
"""Unified removal contract for every credential source Hermes reads from.
Hermes seeds its credential pool from many places:
env:<VAR> os.environ / ~/.hermes/.env
claude_code ~/.claude/.credentials.json
hermes_pkce ~/.hermes/.anthropic_oauth.json
device_code auth.json providers.<provider> (nous, openai-codex, ...)
qwen-cli ~/.qwen/oauth_creds.json
gh_cli gh auth token
config:<name> custom_providers config entry
model_config model.api_key when model.provider == "custom"
manual user ran `hermes auth add`
Each source has its own reader inside ``agent.credential_pool._seed_from_*``
(which keep their existing shape we haven't restructured them). What we
unify here is **removal**:
``hermes auth remove <provider> <N>`` must make the pool entry stay gone.
Before this module, every source had an ad-hoc removal branch in
``auth_remove_command``, and several sources had no branch at all so
``auth remove`` silently reverted on the next ``load_pool()`` call for
qwen-cli, nous device_code (partial), hermes_pkce, copilot gh_cli, and
custom-config sources.
Now every source registers a ``RemovalStep`` that does exactly three things
in the same shape:
1. Clean up whatever externally-readable state the source reads from
(.env line, auth.json block, OAuth file, etc.)
2. Suppress the ``(provider, source_id)`` in auth.json so the
corresponding ``_seed_from_*`` branch skips the upsert on re-load
3. Return ``RemovalResult`` describing what was cleaned and any
diagnostic hints the user should see (shell-exported env vars,
external credential files we deliberately don't delete, etc.)
Adding a new credential source is:
- wire up a reader branch in ``_seed_from_*`` (existing pattern)
- gate that reader behind ``is_source_suppressed(provider, source_id)``
- register a ``RemovalStep`` here
No more per-source if/elif chain in ``auth_remove_command``.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, List, Optional
@dataclass
class RemovalResult:
"""Outcome of removing a credential source.
Attributes:
cleaned: Short strings describing external state that was actually
mutated (``"Cleared XAI_API_KEY from .env"``,
``"Cleared openai-codex OAuth tokens from auth store"``).
Printed as plain lines to the user.
hints: Diagnostic lines ABOUT state the user may need to clean up
themselves or is deliberately left intact (shell-exported env
var, Claude Code credential file we don't delete, etc.).
Printed as plain lines to the user. Always non-destructive.
suppress: Whether to call ``suppress_credential_source`` after
cleanup so future ``load_pool`` calls skip this source.
Default True almost every source needs this to stay sticky.
The only legitimate False is ``manual`` entries, which aren't
seeded from anywhere external.
"""
cleaned: List[str] = field(default_factory=list)
hints: List[str] = field(default_factory=list)
suppress: bool = True
@dataclass
class RemovalStep:
"""How to remove one specific credential source cleanly.
Attributes:
provider: Provider pool key (``"xai"``, ``"anthropic"``, ``"nous"``, ...).
Special value ``"*"`` means "matches any provider" used for
sources like ``manual`` that aren't provider-specific.
source_id: Source identifier as it appears in
``PooledCredential.source``. May be a literal (``"claude_code"``)
or a prefix pattern matched via ``match_fn``.
match_fn: Optional predicate overriding literal ``source_id``
matching. Gets the removed entry's source string. Used for
``env:*`` (any env-seeded key), ``config:*`` (any custom
pool), and ``manual:*`` (any manual-source variant).
remove_fn: ``(provider, removed_entry) -> RemovalResult``. Does the
actual cleanup and returns what happened for the user.
description: One-line human-readable description for docs / tests.
"""
provider: str
source_id: str
remove_fn: Callable[..., RemovalResult]
match_fn: Optional[Callable[[str], bool]] = None
description: str = ""
def matches(self, provider: str, source: str) -> bool:
if self.provider != "*" and self.provider != provider:
return False
if self.match_fn is not None:
return self.match_fn(source)
return source == self.source_id
_REGISTRY: List[RemovalStep] = []
def register(step: RemovalStep) -> RemovalStep:
_REGISTRY.append(step)
return step
def find_removal_step(provider: str, source: str) -> Optional[RemovalStep]:
"""Return the first matching RemovalStep, or None if unregistered.
Unregistered sources fall through to the default remove path in
``auth_remove_command``: the pool entry is already gone (that happens
before dispatch), no external cleanup, no suppression. This is the
correct behaviour for ``manual`` entries they were only ever stored
in the pool, nothing external to clean up.
"""
for step in _REGISTRY:
if step.matches(provider, source):
return step
return None
# ---------------------------------------------------------------------------
# Individual RemovalStep implementations — one per source.
# ---------------------------------------------------------------------------
# Each remove_fn is intentionally small and single-purpose. Adding a new
# credential source means adding ONE entry here — no other changes to
# auth_remove_command.
def _remove_env_source(provider: str, removed) -> RemovalResult:
"""env:<VAR> — the most common case.
Handles three user situations:
1. Var lives only in ~/.hermes/.env clear it
2. Var lives only in the user's shell (shell profile, systemd
EnvironmentFile, launchd plist) hint them where to unset it
3. Var lives in both clear from .env, hint about shell
"""
from hermes_cli.config import get_env_path, remove_env_value
result = RemovalResult()
env_var = removed.source[len("env:"):]
if not env_var:
return result
# Detect shell vs .env BEFORE remove_env_value pops os.environ.
env_in_process = bool(os.getenv(env_var))
env_in_dotenv = False
try:
env_path = get_env_path()
if env_path.exists():
env_in_dotenv = any(
line.strip().startswith(f"{env_var}=")
for line in env_path.read_text(errors="replace").splitlines()
)
except OSError:
pass
shell_exported = env_in_process and not env_in_dotenv
cleared = remove_env_value(env_var)
if cleared:
result.cleaned.append(f"Cleared {env_var} from .env")
if shell_exported:
result.hints.extend([
f"Note: {env_var} is still set in your shell environment "
f"(not in ~/.hermes/.env).",
" Unset it there (shell profile, systemd EnvironmentFile, "
"launchd plist, etc.) or it will keep being visible to Hermes.",
f" The pool entry is now suppressed — Hermes will ignore "
f"{env_var} until you run `hermes auth add {provider}`.",
])
else:
result.hints.append(
f"Suppressed env:{env_var} — it will not be re-seeded even "
f"if the variable is re-exported later."
)
return result
def _remove_claude_code(provider: str, removed) -> RemovalResult:
"""~/.claude/.credentials.json is owned by Claude Code itself.
We don't delete it — the user's Claude Code install still needs to
work. We just suppress it so Hermes stops reading it.
"""
return RemovalResult(hints=[
"Suppressed claude_code credential — it will not be re-seeded.",
"Note: Claude Code credentials still live in ~/.claude/.credentials.json",
"Run `hermes auth add anthropic` to re-enable if needed.",
])
def _remove_hermes_pkce(provider: str, removed) -> RemovalResult:
"""~/.hermes/.anthropic_oauth.json is ours — delete it outright."""
from hermes_constants import get_hermes_home
result = RemovalResult()
oauth_file = get_hermes_home() / ".anthropic_oauth.json"
if oauth_file.exists():
try:
oauth_file.unlink()
result.cleaned.append("Cleared Hermes Anthropic OAuth credentials")
except OSError as exc:
result.hints.append(f"Could not delete {oauth_file}: {exc}")
return result
def _clear_auth_store_provider(provider: str) -> bool:
"""Delete auth_store.providers[provider]. Returns True if deleted."""
from hermes_cli.auth import (
_auth_store_lock,
_load_auth_store,
_save_auth_store,
)
with _auth_store_lock():
auth_store = _load_auth_store()
providers_dict = auth_store.get("providers")
if isinstance(providers_dict, dict) and provider in providers_dict:
del providers_dict[provider]
_save_auth_store(auth_store)
return True
return False
def _remove_nous_device_code(provider: str, removed) -> RemovalResult:
"""Nous OAuth lives in auth.json providers.nous — clear it and suppress.
We suppress in addition to clearing because nothing else stops the
user's next `hermes login` run from writing providers.nous again
before they decide to. Suppression forces them to go through
`hermes auth add nous` to re-engage, which is the documented re-add
path and clears the suppression atomically.
"""
result = RemovalResult()
if _clear_auth_store_provider(provider):
result.cleaned.append(f"Cleared {provider} OAuth tokens from auth store")
return result
def _remove_codex_device_code(provider: str, removed) -> RemovalResult:
"""Codex tokens live in TWO places: our auth store AND ~/.codex/auth.json.
refresh_codex_oauth_pure() writes both every time, so clearing only
the Hermes auth store is not enough _seed_from_singletons() would
re-import from ~/.codex/auth.json on the next load_pool() call and
the removal would be instantly undone. We suppress instead of
deleting Codex CLI's file, so the Codex CLI itself keeps working.
The canonical source name in ``_seed_from_singletons`` is
``"device_code"`` (no prefix). Entries may show up in the pool as
either ``"device_code"`` (seeded) or ``"manual:device_code"`` (added
via ``hermes auth add openai-codex``), but in both cases the re-seed
gate lives at the ``"device_code"`` suppression key. We suppress
that canonical key here; the central dispatcher also suppresses
``removed.source`` which is fine belt-and-suspenders, idempotent.
"""
from hermes_cli.auth import suppress_credential_source
result = RemovalResult()
if _clear_auth_store_provider(provider):
result.cleaned.append(f"Cleared {provider} OAuth tokens from auth store")
# Suppress the canonical re-seed source, not just whatever source the
# removed entry had. Otherwise `manual:device_code` removals wouldn't
# block the `device_code` re-seed path.
suppress_credential_source(provider, "device_code")
result.hints.extend([
"Suppressed openai-codex device_code source — it will not be re-seeded.",
"Note: Codex CLI credentials still live in ~/.codex/auth.json",
"Run `hermes auth add openai-codex` to re-enable if needed.",
])
return result
def _remove_qwen_cli(provider: str, removed) -> RemovalResult:
"""~/.qwen/oauth_creds.json is owned by the Qwen CLI.
Same pattern as claude_code suppress, don't delete. The user's
Qwen CLI install still reads from that file.
"""
return RemovalResult(hints=[
"Suppressed qwen-cli credential — it will not be re-seeded.",
"Note: Qwen CLI credentials still live in ~/.qwen/oauth_creds.json",
"Run `hermes auth add qwen-oauth` to re-enable if needed.",
])
def _remove_copilot_gh(provider: str, removed) -> RemovalResult:
"""Copilot token comes from `gh auth token` or COPILOT_GITHUB_TOKEN / GH_TOKEN / GITHUB_TOKEN.
Copilot is special: the same token can be seeded as multiple source
entries (gh_cli from ``_seed_from_singletons`` plus env:<VAR> from
``_seed_from_env``), so removing one entry without suppressing the
others lets the duplicates resurrect. We suppress ALL known copilot
sources here so removal is stable regardless of which entry the
user clicked.
We don't touch the user's gh CLI or shell state just suppress so
Hermes stops picking the token up.
"""
# Suppress ALL copilot source variants up-front so no path resurrects
# the pool entry. The central dispatcher in auth_remove_command will
# ALSO suppress removed.source, but it's idempotent so double-calling
# is harmless.
from hermes_cli.auth import suppress_credential_source
suppress_credential_source(provider, "gh_cli")
for env_var in ("COPILOT_GITHUB_TOKEN", "GH_TOKEN", "GITHUB_TOKEN"):
suppress_credential_source(provider, f"env:{env_var}")
return RemovalResult(hints=[
"Suppressed all copilot token sources (gh_cli + env vars) — they will not be re-seeded.",
"Note: Your gh CLI / shell environment is unchanged.",
"Run `hermes auth add copilot` to re-enable if needed.",
])
def _remove_custom_config(provider: str, removed) -> RemovalResult:
"""Custom provider pools are seeded from custom_providers config or
model.api_key. Both are in config.yaml modifying that from here
is more invasive than suppression. We suppress; the user can edit
config.yaml if they want to remove the key from disk entirely.
"""
source_label = removed.source
return RemovalResult(hints=[
f"Suppressed {source_label} — it will not be re-seeded.",
"Note: The underlying value in config.yaml is unchanged. Edit it "
"directly if you want to remove the credential from disk.",
])
def _register_all_sources() -> None:
"""Called once on module import.
ORDER MATTERS ``find_removal_step`` returns the first match. Put
provider-specific steps before the generic ``env:*`` step so that e.g.
copilot's ``env:GH_TOKEN`` goes through the copilot removal (which
doesn't touch the user's shell), not the generic env-var removal
(which would try to clear .env).
"""
register(RemovalStep(
provider="copilot", source_id="gh_cli",
match_fn=lambda src: src == "gh_cli" or src.startswith("env:"),
remove_fn=_remove_copilot_gh,
description="gh auth token / COPILOT_GITHUB_TOKEN / GH_TOKEN",
))
register(RemovalStep(
provider="*", source_id="env:",
match_fn=lambda src: src.startswith("env:"),
remove_fn=_remove_env_source,
description="Any env-seeded credential (XAI_API_KEY, DEEPSEEK_API_KEY, etc.)",
))
register(RemovalStep(
provider="anthropic", source_id="claude_code",
remove_fn=_remove_claude_code,
description="~/.claude/.credentials.json",
))
register(RemovalStep(
provider="anthropic", source_id="hermes_pkce",
remove_fn=_remove_hermes_pkce,
description="~/.hermes/.anthropic_oauth.json",
))
register(RemovalStep(
provider="nous", source_id="device_code",
remove_fn=_remove_nous_device_code,
description="auth.json providers.nous",
))
register(RemovalStep(
provider="openai-codex", source_id="device_code",
match_fn=lambda src: src == "device_code" or src.endswith(":device_code"),
remove_fn=_remove_codex_device_code,
description="auth.json providers.openai-codex + ~/.codex/auth.json",
))
register(RemovalStep(
provider="qwen-oauth", source_id="qwen-cli",
remove_fn=_remove_qwen_cli,
description="~/.qwen/oauth_creds.json",
))
register(RemovalStep(
provider="*", source_id="config:",
match_fn=lambda src: src.startswith("config:") or src == "model_config",
remove_fn=_remove_custom_config,
description="Custom provider config.yaml api_key field",
))
_register_all_sources()

View file

@ -152,9 +152,11 @@ def auth_add_command(args) -> None:
pool = load_pool(provider) pool = load_pool(provider)
# Clear any env:<VAR> suppressions for this provider — re-adding a # Clear ALL suppressions for this provider — re-adding a credential is
# credential is a strong signal the user wants auth for this provider # a strong signal the user wants auth re-enabled. This covers env:*
# re-enabled. Matches the Codex device_code re-link pattern below. # (shell-exported vars), gh_cli (copilot), claude_code, qwen-cli,
# device_code (codex), etc. One consistent re-engagement pattern.
# Matches the Codex device_code re-link pattern that predates this.
if not provider.startswith(CUSTOM_POOL_PREFIX): if not provider.startswith(CUSTOM_POOL_PREFIX):
try: try:
from hermes_cli.auth import ( from hermes_cli.auth import (
@ -163,7 +165,6 @@ def auth_add_command(args) -> None:
) )
suppressed = _load_auth_store().get("suppressed_sources", {}) suppressed = _load_auth_store().get("suppressed_sources", {})
for src in list(suppressed.get(provider, []) or []): for src in list(suppressed.get(provider, []) or []):
if src.startswith("env:"):
unsuppress_credential_source(provider, src) unsuppress_credential_source(provider, src)
except Exception: except Exception:
pass pass
@ -354,113 +355,28 @@ def auth_remove_command(args) -> None:
raise SystemExit(f'No credential matching "{target}" for provider {provider}.') raise SystemExit(f'No credential matching "{target}" for provider {provider}.')
print(f"Removed {provider} credential #{index} ({removed.label})") print(f"Removed {provider} credential #{index} ({removed.label})")
# If this was an env-seeded credential, also clear the env var from .env # Unified removal dispatch. Every credential source Hermes reads from
# so it doesn't get re-seeded on the next load_pool() call. If the env # (env vars, external OAuth files, auth.json blocks, custom config)
# var is also (or only) exported by the user's shell/systemd, .env # has a RemovalStep registered in agent.credential_sources. The step
# cleanup alone is not enough — the next process to call load_pool() # handles its source-specific cleanup and we centralise suppression +
# will re-read os.environ and resurrect the entry. Suppress the # user-facing output here so every source behaves identically from
# env:<VAR> source so _seed_from_env() skips it, and tell the user # the user's perspective.
# where the shell-level copy is still living so they can remove it. from agent.credential_sources import find_removal_step
if removed.source.startswith("env:"):
import os as _os
env_var = removed.source[len("env:"):]
if env_var:
from hermes_cli.config import get_env_path, remove_env_value
from hermes_cli.auth import suppress_credential_source from hermes_cli.auth import suppress_credential_source
# Detect whether the var lives in .env, the shell env, or both, step = find_removal_step(provider, removed.source)
# BEFORE remove_env_value() mutates os.environ. if step is None:
env_in_process = bool(_os.getenv(env_var)) # Unregistered source — e.g. "manual", which has nothing external
env_in_dotenv = False # to clean up. The pool entry is already gone; we're done.
try: return
env_path = get_env_path()
if env_path.exists():
env_in_dotenv = any(
line.strip().startswith(f"{env_var}=")
for line in env_path.read_text(errors="replace").splitlines()
)
except OSError:
pass
shell_exported = env_in_process and not env_in_dotenv
cleared = remove_env_value(env_var) result = step.remove_fn(provider, removed)
if cleared: for line in result.cleaned:
print(f"Cleared {env_var} from .env") print(line)
if result.suppress:
suppress_credential_source(provider, removed.source) suppress_credential_source(provider, removed.source)
if shell_exported: for line in result.hints:
print( print(line)
f"Note: {env_var} is still set in your shell environment "
f"(not in ~/.hermes/.env)."
)
print(
" Unset it there (shell profile, systemd EnvironmentFile, "
"launchd plist, etc.) or it will keep being visible to Hermes."
)
print(
f" The pool entry is now suppressed — Hermes will ignore "
f"{env_var} until you run `hermes auth add {provider}`."
)
else:
print(
f"Suppressed env:{env_var} — it will not be re-seeded even "
f"if the variable is re-exported later."
)
# If this was a singleton-seeded credential (OAuth device_code, hermes_pkce),
# clear the underlying auth store / credential file so it doesn't get
# re-seeded on the next load_pool() call.
elif provider == "openai-codex" and (
removed.source == "device_code" or removed.source.endswith(":device_code")
):
# Codex tokens live in TWO places: the Hermes auth store and
# ~/.codex/auth.json (the Codex CLI shared file). On every refresh,
# refresh_codex_oauth_pure() writes to both. So clearing only the
# Hermes auth store is not enough — _seed_from_singletons() will
# auto-import from ~/.codex/auth.json on the next load_pool() and
# the removal is instantly undone. Mark the source as suppressed
# so auto-import is skipped; leave ~/.codex/auth.json untouched so
# the Codex CLI itself keeps working.
from hermes_cli.auth import (
_load_auth_store, _save_auth_store, _auth_store_lock,
suppress_credential_source,
)
with _auth_store_lock():
auth_store = _load_auth_store()
providers_dict = auth_store.get("providers")
if isinstance(providers_dict, dict) and provider in providers_dict:
del providers_dict[provider]
_save_auth_store(auth_store)
print(f"Cleared {provider} OAuth tokens from auth store")
suppress_credential_source(provider, "device_code")
print("Suppressed openai-codex device_code source — it will not be re-seeded.")
print("Note: Codex CLI credentials still live in ~/.codex/auth.json")
print("Run `hermes auth add openai-codex` to re-enable if needed.")
elif removed.source == "device_code" and provider == "nous":
from hermes_cli.auth import (
_load_auth_store, _save_auth_store, _auth_store_lock,
)
with _auth_store_lock():
auth_store = _load_auth_store()
providers_dict = auth_store.get("providers")
if isinstance(providers_dict, dict) and provider in providers_dict:
del providers_dict[provider]
_save_auth_store(auth_store)
print(f"Cleared {provider} OAuth tokens from auth store")
elif removed.source == "hermes_pkce" and provider == "anthropic":
from hermes_constants import get_hermes_home
oauth_file = get_hermes_home() / ".anthropic_oauth.json"
if oauth_file.exists():
oauth_file.unlink()
print("Cleared Hermes Anthropic OAuth credentials")
elif removed.source == "claude_code" and provider == "anthropic":
from hermes_cli.auth import suppress_credential_source
suppress_credential_source(provider, "claude_code")
print("Suppressed claude_code credential — it will not be re-seeded.")
print("Note: Claude Code credentials still live in ~/.claude/.credentials.json")
print("Run `hermes auth add anthropic` to re-enable if needed.")
def auth_reset_command(args) -> None: def auth_reset_command(args) -> None:

View file

@ -1185,3 +1185,292 @@ def test_seed_from_env_respects_openrouter_suppression(tmp_path, monkeypatch):
assert changed is False assert changed is False
assert entries == [] assert entries == []
assert active == set() assert active == set()
# =============================================================================
# Unified credential-source stickiness — every source Hermes reads from has a
# registered RemovalStep in agent.credential_sources, and every seeding path
# gates on is_source_suppressed. Below: one test per source proving remove
# sticks across a fresh load_pool() call.
# =============================================================================
def test_seed_from_singletons_respects_nous_suppression(tmp_path, monkeypatch):
"""nous device_code must not re-seed from auth.json when suppressed."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1,
"providers": {"nous": {"access_token": "tok", "refresh_token": "r", "expires_at": 9999999999}},
"suppressed_sources": {"nous": ["device_code"]},
}))
from agent.credential_pool import _seed_from_singletons
entries = []
changed, active = _seed_from_singletons("nous", entries)
assert changed is False
assert entries == []
assert active == set()
def test_seed_from_singletons_respects_copilot_suppression(tmp_path, monkeypatch):
"""copilot gh_cli must not re-seed when suppressed."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1,
"providers": {},
"suppressed_sources": {"copilot": ["gh_cli"]},
}))
# Stub resolve_copilot_token to return a live token
import hermes_cli.copilot_auth as ca
monkeypatch.setattr(ca, "resolve_copilot_token", lambda: ("ghp_fake", "gh auth token"))
from agent.credential_pool import _seed_from_singletons
entries = []
changed, active = _seed_from_singletons("copilot", entries)
assert changed is False
assert entries == []
assert active == set()
def test_seed_from_singletons_respects_qwen_suppression(tmp_path, monkeypatch):
"""qwen-oauth qwen-cli must not re-seed from ~/.qwen/oauth_creds.json when suppressed."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1,
"providers": {},
"suppressed_sources": {"qwen-oauth": ["qwen-cli"]},
}))
import hermes_cli.auth as ha
monkeypatch.setattr(ha, "resolve_qwen_runtime_credentials", lambda **kw: {
"api_key": "tok", "source": "qwen-cli", "base_url": "https://q",
})
from agent.credential_pool import _seed_from_singletons
entries = []
changed, active = _seed_from_singletons("qwen-oauth", entries)
assert changed is False
assert entries == []
assert active == set()
def test_seed_from_singletons_respects_hermes_pkce_suppression(tmp_path, monkeypatch):
"""anthropic hermes_pkce must not re-seed from ~/.hermes/.anthropic_oauth.json when suppressed."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import yaml
(hermes_home / "config.yaml").write_text(yaml.dump({"model": {"provider": "anthropic", "model": "claude"}}))
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1,
"providers": {},
"suppressed_sources": {"anthropic": ["hermes_pkce"]},
}))
# Stub the readers so only hermes_pkce is "available"; claude_code returns None
import agent.anthropic_adapter as aa
monkeypatch.setattr(aa, "read_hermes_oauth_credentials", lambda: {
"accessToken": "tok", "refreshToken": "r", "expiresAt": 9999999999000,
})
monkeypatch.setattr(aa, "read_claude_code_credentials", lambda: None)
from agent.credential_pool import _seed_from_singletons
entries = []
changed, active = _seed_from_singletons("anthropic", entries)
# hermes_pkce suppressed, claude_code returns None → nothing should be seeded
assert entries == []
assert "hermes_pkce" not in active
def test_seed_custom_pool_respects_config_suppression(tmp_path, monkeypatch):
"""Custom provider config:<name> source must not re-seed when suppressed."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import yaml
(hermes_home / "config.yaml").write_text(yaml.dump({
"model": {},
"custom_providers": [
{"name": "my", "base_url": "https://c.example.com", "api_key": "sk-custom"},
],
}))
from agent.credential_pool import _seed_custom_pool, get_custom_provider_pool_key
pool_key = get_custom_provider_pool_key("https://c.example.com")
(hermes_home / "auth.json").write_text(json.dumps({
"version": 1,
"providers": {},
"suppressed_sources": {pool_key: ["config:my"]},
}))
entries = []
changed, active = _seed_custom_pool(pool_key, entries)
assert changed is False
assert entries == []
assert "config:my" not in active
def test_credential_sources_registry_has_expected_steps():
"""Sanity check — the registry contains the expected RemovalSteps.
Guards against accidentally dropping a step during future refactors.
If you add a new credential source, add it to the expected set below.
"""
from agent.credential_sources import _REGISTRY
descriptions = {step.description for step in _REGISTRY}
expected = {
"gh auth token / COPILOT_GITHUB_TOKEN / GH_TOKEN",
"Any env-seeded credential (XAI_API_KEY, DEEPSEEK_API_KEY, etc.)",
"~/.claude/.credentials.json",
"~/.hermes/.anthropic_oauth.json",
"auth.json providers.nous",
"auth.json providers.openai-codex + ~/.codex/auth.json",
"~/.qwen/oauth_creds.json",
"Custom provider config.yaml api_key field",
}
assert descriptions == expected, f"Registry mismatch. Got: {descriptions}"
def test_credential_sources_find_step_returns_none_for_manual():
"""Manual entries have nothing external to clean up — no step registered."""
from agent.credential_sources import find_removal_step
assert find_removal_step("openrouter", "manual") is None
assert find_removal_step("xai", "manual") is None
def test_credential_sources_find_step_copilot_before_generic_env(tmp_path, monkeypatch):
"""copilot env:GH_TOKEN must dispatch to the copilot step, not the
generic env-var step. The copilot step handles the duplicate-source
problem (same token seeded as both gh_cli and env:<VAR>); the generic
env step would only suppress one of the variants.
"""
from agent.credential_sources import find_removal_step
step = find_removal_step("copilot", "env:GH_TOKEN")
assert step is not None
assert "copilot" in step.description.lower() or "gh" in step.description.lower()
# Generic step still matches any other provider's env var
step = find_removal_step("xai", "env:XAI_API_KEY")
assert step is not None
assert "env-seeded" in step.description.lower()
def test_auth_remove_copilot_suppresses_all_variants(tmp_path, monkeypatch):
"""Removing any copilot source must suppress gh_cli + all env:* variants
so the duplicate-seed paths don't resurrect the credential.
"""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"copilot": [{
"id": "c1",
"label": "gh auth token",
"auth_type": "api_key",
"priority": 0,
"source": "gh_cli",
"access_token": "ghp_fake",
}]
},
},
)
from types import SimpleNamespace
from hermes_cli.auth import is_source_suppressed
from hermes_cli.auth_commands import auth_remove_command
auth_remove_command(SimpleNamespace(provider="copilot", target="1"))
assert is_source_suppressed("copilot", "gh_cli")
assert is_source_suppressed("copilot", "env:COPILOT_GITHUB_TOKEN")
assert is_source_suppressed("copilot", "env:GH_TOKEN")
assert is_source_suppressed("copilot", "env:GITHUB_TOKEN")
def test_auth_add_clears_all_suppressions_including_non_env(tmp_path, monkeypatch):
"""Re-adding a credential via `hermes auth add <provider>` clears ALL
suppression markers for the provider, not just env:*. This matches
the single "re-engage" semantic the user wants auth back, period.
"""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
_write_auth_store(
tmp_path,
{
"version": 1,
"providers": {},
"suppressed_sources": {
"copilot": ["gh_cli", "env:GH_TOKEN", "env:COPILOT_GITHUB_TOKEN"],
},
},
)
from types import SimpleNamespace
from hermes_cli.auth import is_source_suppressed
from hermes_cli.auth_commands import auth_add_command
auth_add_command(SimpleNamespace(
provider="copilot", auth_type="api_key",
api_key="ghp-manual", label="m",
))
assert not is_source_suppressed("copilot", "gh_cli")
assert not is_source_suppressed("copilot", "env:GH_TOKEN")
assert not is_source_suppressed("copilot", "env:COPILOT_GITHUB_TOKEN")
def test_auth_remove_codex_manual_device_code_suppresses_canonical(tmp_path, monkeypatch):
"""Removing a manual:device_code entry (from `hermes auth add openai-codex`)
must suppress the canonical ``device_code`` key, not ``manual:device_code``.
The re-seed gate in _seed_from_singletons checks ``device_code``.
"""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
_write_auth_store(
tmp_path,
{
"version": 1,
"providers": {"openai-codex": {"tokens": {"access_token": "t", "refresh_token": "r"}}},
"credential_pool": {
"openai-codex": [{
"id": "cdx",
"label": "manual-codex",
"auth_type": "oauth",
"priority": 0,
"source": "manual:device_code",
"access_token": "t",
}]
},
},
)
from types import SimpleNamespace
from hermes_cli.auth import is_source_suppressed
from hermes_cli.auth_commands import auth_remove_command
auth_remove_command(SimpleNamespace(provider="openai-codex", target="1"))
assert is_source_suppressed("openai-codex", "device_code")