fix: re-auth on stale OAuth token; read Claude Code credentials from macOS Keychain

Bug 3 — Stale OAuth token not detected in 'hermes model':
- _model_flow_anthropic used 'has_creds = bool(existing_key)' which treats
  any non-empty token (including expired OAuth tokens) as valid.
- Added existing_is_stale_oauth check: if the only credential is an OAuth
  token (sk-ant- prefix) with no valid cc_creds fallback, mark it stale
  and force the re-auth menu instead of silently accepting a broken token.

Bug 4 — macOS Keychain credentials never read:
- Claude Code >=2.1.114 migrated from ~/.claude/.credentials.json to the
  macOS Keychain under service 'Claude Code-credentials'.
- Added _read_claude_code_credentials_from_keychain() using the 'security'
  CLI tool; read_claude_code_credentials() now tries Keychain first then
  falls back to JSON file.
- Non-Darwin platforms return None from Keychain read immediately.

Tests:
- tests/agent/test_anthropic_keychain.py: 11 cases covering Darwin-only
  guard, security command failures, JSON parsing, fallback priority.
- tests/hermes_cli/test_anthropic_model_flow_stale_oauth.py: 8 cases
  covering stale OAuth detection, API key passthrough, cc_creds fallback.

Refs: #12905
This commit is contained in:
5park1e 2026-04-20 17:41:27 +08:00 committed by Teknium
parent 5383615db5
commit e1106772d9
4 changed files with 458 additions and 2 deletions

View file

@ -14,6 +14,8 @@ import copy
import json
import logging
import os
import platform
import subprocess
from pathlib import Path
from hermes_constants import get_hermes_home
@ -465,8 +467,72 @@ def build_anthropic_bedrock_client(region: str):
)
def _read_claude_code_credentials_from_keychain() -> Optional[Dict[str, Any]]:
"""Read Claude Code OAuth credentials from the macOS Keychain.
Claude Code >=2.1.114 stores credentials in the macOS Keychain under the
service name "Claude Code-credentials" rather than (or in addition to)
the JSON file at ~/.claude/.credentials.json.
The password field contains a JSON string with the same claudeAiOauth
structure as the JSON file.
Returns dict with {accessToken, refreshToken?, expiresAt?} or None.
"""
import platform
import subprocess
if platform.system() != "Darwin":
return None
try:
# Read the "Claude Code-credentials" generic password entry
result = subprocess.run(
["security", "find-generic-password",
"-s", "Claude Code-credentials",
"-w"],
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
logger.debug("Keychain: security command not available or timed out")
return None
if result.returncode != 0:
logger.debug("Keychain: no entry found for 'Claude Code-credentials'")
return None
raw = result.stdout.strip()
if not raw:
return None
try:
data = json.loads(raw)
except json.JSONDecodeError:
logger.debug("Keychain: credentials payload is not valid JSON")
return None
oauth_data = data.get("claudeAiOauth")
if oauth_data and isinstance(oauth_data, dict):
access_token = oauth_data.get("accessToken", "")
if access_token:
return {
"accessToken": access_token,
"refreshToken": oauth_data.get("refreshToken", ""),
"expiresAt": oauth_data.get("expiresAt", 0),
"source": "macos_keychain",
}
return None
def read_claude_code_credentials() -> Optional[Dict[str, Any]]:
"""Read refreshable Claude Code OAuth credentials from ~/.claude/.credentials.json.
"""Read refreshable Claude Code OAuth credentials.
Checks two sources in order:
1. macOS Keychain (Darwin only) "Claude Code-credentials" entry
2. ~/.claude/.credentials.json file
This intentionally excludes ~/.claude.json primaryApiKey. Opencode's
subscription flow is OAuth/setup-token based with refreshable credentials,
@ -475,6 +541,12 @@ def read_claude_code_credentials() -> Optional[Dict[str, Any]]:
Returns dict with {accessToken, refreshToken?, expiresAt?} or None.
"""
# Try macOS Keychain first (covers Claude Code >=2.1.114)
kc_creds = _read_claude_code_credentials_from_keychain()
if kc_creds:
return kc_creds
# Fall back to JSON file
cred_path = Path.home() / ".claude" / ".credentials.json"
if cred_path.exists():
try:

View file

@ -4316,6 +4316,8 @@ def _model_flow_anthropic(config, current_model=""):
from agent.anthropic_adapter import (
read_claude_code_credentials,
is_claude_code_token_valid,
_is_oauth_token,
_resolve_claude_code_token_from_credentials,
)
cc_creds = read_claude_code_credentials()
@ -4324,7 +4326,14 @@ def _model_flow_anthropic(config, current_model=""):
except Exception:
pass
has_creds = bool(existing_key) or cc_available
# Stale-OAuth guard: if the only existing cred is an expired OAuth token
# (no valid cc_creds to fall back on), treat it as missing so the re-auth
# path is offered instead of silently accepting a broken token.
existing_is_stale_oauth = False
if existing_key and _is_oauth_token(existing_key) and not cc_available:
existing_is_stale_oauth = True
has_creds = (bool(existing_key) and not existing_is_stale_oauth) or cc_available
needs_auth = not has_creds
if has_creds:

View file

@ -0,0 +1,165 @@
"""Tests for Bug #12905 fixes in agent/anthropic_adapter.py — macOS Keychain support."""
import json
import platform
from unittest.mock import patch, MagicMock
import pytest
from agent.anthropic_adapter import (
_read_claude_code_credentials_from_keychain,
read_claude_code_credentials,
)
class TestReadClaudeCodeCredentialsFromKeychain:
"""Bug 4: macOS Keychain support for Claude Code >=2.1.114."""
def test_returns_none_on_linux(self):
"""Keychain reading is Darwin-only; must return None on other platforms."""
with patch("agent.anthropic_adapter.platform.system", return_value="Linux"):
assert _read_claude_code_credentials_from_keychain() is None
def test_returns_none_on_windows(self):
with patch("agent.anthropic_adapter.platform.system", return_value="Windows"):
assert _read_claude_code_credentials_from_keychain() is None
def test_returns_none_when_security_command_not_found(self):
"""OSError from missing security binary must be handled gracefully."""
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run",
side_effect=OSError("security not found")):
assert _read_claude_code_credentials_from_keychain() is None
def test_returns_none_on_nonzero_exit_code(self):
"""security returns non-zero when the Keychain entry doesn't exist."""
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="")
assert _read_claude_code_credentials_from_keychain() is None
def test_returns_none_for_empty_stdout(self):
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
assert _read_claude_code_credentials_from_keychain() is None
def test_returns_none_for_non_json_payload(self):
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="not valid json", stderr="")
assert _read_claude_code_credentials_from_keychain() is None
def test_returns_none_when_password_field_is_missing_claude_ai_oauth(self):
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0,
stdout=json.dumps({"someOtherService": {"accessToken": "tok"}}),
stderr="",
)
assert _read_claude_code_credentials_from_keychain() is None
def test_returns_none_when_access_token_is_empty(self):
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0,
stdout=json.dumps({"claudeAiOauth": {"accessToken": "", "refreshToken": "x"}}),
stderr="",
)
assert _read_claude_code_credentials_from_keychain() is None
def test_parses_valid_keychain_entry(self):
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0,
stdout=json.dumps({
"claudeAiOauth": {
"accessToken": "kc-access-token-abc",
"refreshToken": "kc-refresh-token-xyz",
"expiresAt": 9999999999999,
}
}),
stderr="",
)
creds = _read_claude_code_credentials_from_keychain()
assert creds is not None
assert creds["accessToken"] == "kc-access-token-abc"
assert creds["refreshToken"] == "kc-refresh-token-xyz"
assert creds["expiresAt"] == 9999999999999
assert creds["source"] == "macos_keychain"
class TestReadClaudeCodeCredentialsPriority:
"""Bug 4: Keychain must be checked before the JSON file."""
def test_keychain_takes_priority_over_json_file(self, tmp_path, monkeypatch):
"""When both Keychain and JSON file have credentials, Keychain wins."""
# Set up JSON file with "older" token
json_cred_file = tmp_path / ".claude" / ".credentials.json"
json_cred_file.parent.mkdir(parents=True)
json_cred_file.write_text(json.dumps({
"claudeAiOauth": {
"accessToken": "json-token",
"refreshToken": "json-refresh",
"expiresAt": 9999999999999,
}
}))
monkeypatch.setattr("agent.anthropic_adapter.Path.home", lambda: tmp_path)
# Mock Keychain to return a "newer" token
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0,
stdout=json.dumps({
"claudeAiOauth": {
"accessToken": "keychain-token",
"refreshToken": "keychain-refresh",
"expiresAt": 9999999999999,
}
}),
stderr="",
)
creds = read_claude_code_credentials()
# Keychain token should be returned, not JSON file token
assert creds is not None
assert creds["accessToken"] == "keychain-token"
assert creds["source"] == "macos_keychain"
def test_falls_back_to_json_when_keychain_returns_none(self, tmp_path, monkeypatch):
"""When Keychain has no entry, JSON file is used as fallback."""
json_cred_file = tmp_path / ".claude" / ".credentials.json"
json_cred_file.parent.mkdir(parents=True)
json_cred_file.write_text(json.dumps({
"claudeAiOauth": {
"accessToken": "json-fallback-token",
"refreshToken": "json-refresh",
"expiresAt": 9999999999999,
}
}))
monkeypatch.setattr("agent.anthropic_adapter.Path.home", lambda: tmp_path)
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
# Simulate Keychain entry not found
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="")
creds = read_claude_code_credentials()
assert creds is not None
assert creds["accessToken"] == "json-fallback-token"
assert creds["source"] == "claude_code_credentials_file"
def test_returns_none_when_neither_keychain_nor_json_has_creds(self, tmp_path, monkeypatch):
"""No credentials anywhere — must return None cleanly."""
monkeypatch.setattr("agent.anthropic_adapter.Path.home", lambda: tmp_path)
with patch("agent.anthropic_adapter.platform.system", return_value="Darwin"), \
patch("agent.anthropic_adapter.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="")
creds = read_claude_code_credentials()
assert creds is None

View file

@ -0,0 +1,210 @@
"""Tests for Bug #12905 fix — stale OAuth token detection in hermes model flow.
Bug 3: `hermes model` with `provider=anthropic` skips OAuth re-authentication
when a stale ANTHROPIC_TOKEN exists in ~/.hermes/.env but no valid
Claude Code credentials are available. The fast-path silently proceeds to
model selection with a broken token instead of offering re-auth.
"""
import json
import pytest
from unittest.mock import patch, MagicMock
from hermes_cli.config import load_env, save_env_value
class TestStaleOAuthTokenDetection:
"""Bug 3: stale OAuth token must trigger needs_auth=True in _model_flow_anthropic."""
def test_stale_oauth_token_triggers_reauth(self, tmp_path, monkeypatch, capsys):
"""
Scenario: ANTHROPIC_TOKEN is an expired OAuth token and there are no
valid Claude Code credentials anywhere. The flow MUST offer re-auth
instead of silently skipping to model selection.
"""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Pre-load .env with an expired OAuth token (sk-ant- prefix = OAuth)
save_env_value("ANTHROPIC_TOKEN", "sk-ant-oat-ExpiredToken00000")
save_env_value("ANTHROPIC_API_KEY", "")
# No valid Claude Code credentials available (expired, no refresh token)
monkeypatch.setattr(
"agent.anthropic_adapter.read_claude_code_credentials",
lambda: {
"accessToken": "expired-cc-token",
"refreshToken": "", # No refresh — can't recover
"expiresAt": 0, # Already expired
"source": "claude_code_credentials_file",
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.is_claude_code_token_valid",
lambda creds: False, # Explicitly expired
)
monkeypatch.setattr(
"agent.anthropic_adapter._is_oauth_token",
lambda key: key.startswith("sk-ant-"),
)
# _resolve_claude_code_token_from_credentials has no valid path
monkeypatch.setattr(
"agent.anthropic_adapter._resolve_claude_code_token_from_credentials",
lambda creds=None: None,
)
# Simulate user types "3" (Cancel) when prompted for re-auth
monkeypatch.setattr("builtins.input", lambda _: "3")
monkeypatch.setattr("getpass.getpass", lambda _: "")
from hermes_cli.main import _model_flow_anthropic
cfg = {}
_model_flow_anthropic(cfg)
output = capsys.readouterr().out
# Must show auth method choice since token is stale
assert "subscription" in output or "API key" in output, (
f"Expected auth method menu but got: {output!r}"
)
def test_valid_api_key_skips_stale_check(self, tmp_path, monkeypatch, capsys):
"""
A non-OAuth ANTHROPIC_API_KEY (regular pay-per-token key) must NOT be
flagged as stale even when cc_creds are invalid. Regular API keys don't
expire the same way OAuth tokens do.
"""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Regular API key — NOT an OAuth token
save_env_value("ANTHROPIC_API_KEY", "sk-ant-api03-RegularPayPerTokenKey")
save_env_value("ANTHROPIC_TOKEN", "")
monkeypatch.setattr(
"agent.anthropic_adapter.read_claude_code_credentials",
lambda: None, # No CC creds
)
monkeypatch.setattr(
"agent.anthropic_adapter.is_claude_code_token_valid",
lambda creds: False,
)
monkeypatch.setattr(
"agent.anthropic_adapter._is_oauth_token",
lambda key: key.startswith("sk-ant-") and "oat" in key,
)
# Simulate user picks "1" (use existing)
monkeypatch.setattr("builtins.input", lambda _: "1")
from hermes_cli.main import _model_flow_anthropic
cfg = {}
_model_flow_anthropic(cfg)
output = capsys.readouterr().out
# Should show "Use existing credentials" menu, NOT auth method choice
assert "Use existing" in output or "credentials" in output.lower()
def test_valid_oauth_token_with_refresh_available_skips_reauth(self, tmp_path, monkeypatch, capsys):
"""
When ANTHROPIC_TOKEN is OAuth and valid cc_creds with refresh exist,
the flow should use existing credentials (no forced re-auth).
"""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
save_env_value("ANTHROPIC_TOKEN", "sk-ant-oat-GoodOAuthToken")
save_env_value("ANTHROPIC_API_KEY", "")
# Valid Claude Code credentials with refresh token
monkeypatch.setattr(
"agent.anthropic_adapter.read_claude_code_credentials",
lambda: {
"accessToken": "valid-cc-token",
"refreshToken": "valid-refresh",
"expiresAt": 9999999999999,
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.is_claude_code_token_valid",
lambda creds: True,
)
monkeypatch.setattr(
"agent.anthropic_adapter._is_oauth_token",
lambda key: key.startswith("sk-ant-"),
)
monkeypatch.setattr(
"agent.anthropic_adapter._resolve_claude_code_token_from_credentials",
lambda creds=None: "valid-cc-token",
)
# Simulate user picks "1" (use existing)
monkeypatch.setattr("builtins.input", lambda _: "1")
from hermes_cli.main import _model_flow_anthropic
cfg = {}
_model_flow_anthropic(cfg)
output = capsys.readouterr().out
# Should show "Use existing" without forcing re-auth
assert "Use existing" in output or "credentials" in output.lower()
class TestStaleOAuthGuardLogic:
"""Unit-level test of the stale-OAuth detection guard logic."""
def test_stale_oauth_flag_logic_no_cc_creds(self):
"""
When existing_key is OAuth and cc_available is False,
existing_is_stale_oauth should be True has_creds = False.
"""
existing_key = "sk-ant-oat-expiredtoken123"
_is_oauth_token = lambda k: k.startswith("sk-ant-")
cc_available = False
existing_is_stale_oauth = (
bool(existing_key) and
_is_oauth_token(existing_key) and
not cc_available
)
has_creds = (bool(existing_key) and not existing_is_stale_oauth) or cc_available
assert existing_is_stale_oauth is True
assert has_creds is False
def test_stale_oauth_flag_logic_with_valid_cc_creds(self):
"""
When existing_key is OAuth but cc_available is True (valid creds exist),
has_creds should be True the cc_creds will be used instead.
"""
existing_key = "sk-ant-oat-sometoken"
_is_oauth_token = lambda k: k.startswith("sk-ant-")
cc_available = True
existing_is_stale_oauth = (
bool(existing_key) and
_is_oauth_token(existing_key) and
not cc_available
)
has_creds = (bool(existing_key) and not existing_is_stale_oauth) or cc_available
assert existing_is_stale_oauth is False
assert has_creds is True
def test_non_oauth_key_not_flagged_as_stale(self):
"""
Regular ANTHROPIC_API_KEY (non-OAuth) must not be flagged as stale
even when cc_available is False.
"""
existing_key = "sk-ant-api03-regular-key"
_is_oauth_token = lambda k: k.startswith("sk-ant-") and "oat" in k
cc_available = False
existing_is_stale_oauth = (
bool(existing_key) and
_is_oauth_token(existing_key) and
not cc_available
)
has_creds = (bool(existing_key) and not existing_is_stale_oauth) or cc_available
assert existing_is_stale_oauth is False
assert has_creds is True