diff --git a/agent/credential_pool.py b/agent/credential_pool.py index e5127ad7d..8a2fecf5d 100644 --- a/agent/credential_pool.py +++ b/agent/credential_pool.py @@ -1176,6 +1176,35 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup except Exception as exc: logger.debug("Copilot token seed failed: %s", exc) + elif provider == "qwen-oauth": + # Qwen OAuth tokens live in ~/.qwen/oauth_creds.json, written by + # the Qwen CLI (`qwen auth qwen-oauth`). They aren't in the + # Hermes auth store or env vars, so resolve them here. + # Use refresh_if_expiring=False to avoid network calls during + # pool loading / provider discovery. + try: + from hermes_cli.auth import resolve_qwen_runtime_credentials + creds = resolve_qwen_runtime_credentials(refresh_if_expiring=False) + token = creds.get("api_key", "") + if token: + source_name = creds.get("source", "qwen-cli") + active_sources.add(source_name) + changed |= _upsert_entry( + entries, + provider, + source_name, + { + "source": source_name, + "auth_type": AUTH_TYPE_OAUTH, + "access_token": token, + "expires_at_ms": creds.get("expires_at_ms"), + "base_url": creds.get("base_url", ""), + "label": creds.get("auth_file", source_name), + }, + ) + except Exception as exc: + logger.debug("Qwen OAuth token seed failed: %s", exc) + elif provider == "openai-codex": state = _load_provider_state(auth_store, "openai-codex") tokens = state.get("tokens") if isinstance(state, dict) else None diff --git a/tests/agent/test_credential_pool.py b/tests/agent/test_credential_pool.py index 466d92153..ca232c12f 100644 --- a/tests/agent/test_credential_pool.py +++ b/tests/agent/test_credential_pool.py @@ -1108,3 +1108,51 @@ def test_load_pool_does_not_seed_copilot_when_no_token(tmp_path, monkeypatch): assert not pool.has_credentials() assert pool.entries() == [] + + +def test_load_pool_seeds_qwen_oauth_via_cli_tokens(tmp_path, monkeypatch): + """Qwen OAuth credentials from ~/.qwen/oauth_creds.json should be seeded into the pool.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes")) + _write_auth_store(tmp_path, {"version": 1, "credential_pool": {}}) + + monkeypatch.setattr( + "hermes_cli.auth.resolve_qwen_runtime_credentials", + lambda **kw: { + "provider": "qwen-oauth", + "base_url": "https://portal.qwen.ai/v1", + "api_key": "qwen_fake_token_xyz", + "source": "qwen-cli", + "expires_at_ms": 1900000000000, + "auth_file": str(tmp_path / ".qwen" / "oauth_creds.json"), + }, + ) + + from agent.credential_pool import load_pool + pool = load_pool("qwen-oauth") + + assert pool.has_credentials() + entries = pool.entries() + assert len(entries) == 1 + assert entries[0].source == "qwen-cli" + assert entries[0].access_token == "qwen_fake_token_xyz" + + +def test_load_pool_does_not_seed_qwen_oauth_when_no_token(tmp_path, monkeypatch): + """Qwen OAuth pool should be empty when no CLI credentials exist.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes")) + _write_auth_store(tmp_path, {"version": 1, "credential_pool": {}}) + + from hermes_cli.auth import AuthError + + monkeypatch.setattr( + "hermes_cli.auth.resolve_qwen_runtime_credentials", + lambda **kw: (_ for _ in ()).throw( + AuthError("Qwen CLI credentials not found.", provider="qwen-oauth", code="qwen_auth_missing") + ), + ) + + from agent.credential_pool import load_pool + pool = load_pool("qwen-oauth") + + assert not pool.has_credentials() + assert pool.entries() == []