diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index a65e9ea78b8..9017b682f77 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -71,6 +71,7 @@ DEFAULT_NOUS_PORTAL_URL = "https://portal.nousresearch.com" DEFAULT_NOUS_INFERENCE_URL = "https://inference-api.nousresearch.com/v1" DEFAULT_NOUS_CLIENT_ID = "hermes-cli" NOUS_INFERENCE_INVOKE_SCOPE = "inference:invoke" +NOUS_BILLING_MANAGE_SCOPE = "billing:manage" DEFAULT_NOUS_SCOPE = NOUS_INFERENCE_INVOKE_SCOPE NOUS_DEVICE_CODE_SOURCE = "device_code" NOUS_AUTH_PATH_INVOKE_JWT = "invoke_jwt" @@ -7628,6 +7629,89 @@ def _nous_device_code_login( raise +def nous_token_has_billing_scope() -> bool: + """Return True if the currently-held Nous token carries ``billing:manage``. + + Reads the persisted ``scope`` string saved at login (``_save_provider_state`` + stores ``token_data.get("scope") or scope``). A space-delimited match. Used by + the lazy step-up: if False, the first billing call will 403 ``insufficient_scope`` + anyway, but checking up front lets a surface skip a doomed round-trip. + """ + try: + state = get_provider_auth_state("nous") or {} + except Exception: + return False + scope = state.get("scope") + if not isinstance(scope, str): + return False + return NOUS_BILLING_MANAGE_SCOPE in scope.split() + + +def step_up_nous_billing_scope( + *, + open_browser: bool = True, + timeout_seconds: float = 15.0, +) -> bool: + """Re-run the device flow requesting ``billing:manage`` and persist the result. + + The lazy step-up (plan D-A): triggered when a billing endpoint returns + ``403 insufficient_scope``. Runs a fresh device-connect with + ``inference:invoke tool:invoke billing:manage`` on the scope. The user must be + an ADMIN/OWNER and tick "Allow terminal billing" in the portal for the minted + token to actually carry the scope; otherwise NAS silently downscopes and this + returns False. + + Reuses the held credential's portal/inference URLs + client_id so the step-up + targets the same deployment (incl. a preview via ``HERMES_PORTAL_BASE_URL`` set + at the original login). Persists to the auth store + shared store + pool, exactly + like ``_login_nous`` — but WITHOUT the model picker (this is a scope upgrade, not + a fresh login). + + Returns True iff the new token carries ``billing:manage``. + """ + prior = get_provider_auth_state("nous") or {} + pconfig = PROVIDER_REGISTRY["nous"] + + # Build the step-up scope: existing scopes (if any) + billing:manage, deduped, + # order-stable. Fall back to the standard inference+tool+billing set. + _raw_scope = prior.get("scope") + prior_scope = _raw_scope if isinstance(_raw_scope, str) else "" + requested: list[str] = [] + for tok in (prior_scope.split() or [NOUS_INFERENCE_INVOKE_SCOPE, "tool:invoke"]): + if tok and tok not in requested: + requested.append(tok) + if NOUS_BILLING_MANAGE_SCOPE not in requested: + requested.append(NOUS_BILLING_MANAGE_SCOPE) + scope = " ".join(requested) + + auth_state = _nous_device_code_login( + portal_base_url=prior.get("portal_base_url") or None, + inference_base_url=prior.get("inference_base_url") or None, + client_id=prior.get("client_id") or pconfig.client_id, + scope=scope, + open_browser=open_browser, + timeout_seconds=timeout_seconds, + ) + + with _auth_store_lock(): + auth_store = _load_auth_store() + _save_provider_state(auth_store, "nous", auth_state) + _save_auth_store(auth_store) + + # Mirror to shared store + reseed the pool (best-effort), same as _login_nous. + try: + _write_shared_nous_state(auth_state) + except Exception: + pass + try: + _sync_nous_pool_from_auth_store() + except Exception: + pass + + granted = auth_state.get("scope") + return isinstance(granted, str) and NOUS_BILLING_MANAGE_SCOPE in granted.split() + + def _login_nous(args, pconfig: ProviderConfig) -> None: """Nous Portal device authorization flow.""" timeout_seconds = getattr(args, "timeout", None) or 15.0 diff --git a/tests/hermes_cli/test_billing_scope_stepup.py b/tests/hermes_cli/test_billing_scope_stepup.py new file mode 100644 index 00000000000..5ec0ceef6ca --- /dev/null +++ b/tests/hermes_cli/test_billing_scope_stepup.py @@ -0,0 +1,127 @@ +"""Tests for the Phase 2b billing:manage scope step-up (auth.py).""" + +from __future__ import annotations + +import pytest + +import hermes_cli.auth as auth +from hermes_cli.auth import ( + NOUS_BILLING_MANAGE_SCOPE, + nous_token_has_billing_scope, + step_up_nous_billing_scope, +) + + +# --------------------------------------------------------------------------- +# nous_token_has_billing_scope +# --------------------------------------------------------------------------- + + +def test_has_scope_true_when_present(monkeypatch): + monkeypatch.setattr( + auth, + "get_provider_auth_state", + lambda p: {"scope": "inference:invoke tool:invoke billing:manage"}, + ) + assert nous_token_has_billing_scope() is True + + +def test_has_scope_false_when_absent(monkeypatch): + monkeypatch.setattr( + auth, "get_provider_auth_state", lambda p: {"scope": "inference:invoke tool:invoke"} + ) + assert nous_token_has_billing_scope() is False + + +def test_has_scope_false_when_no_state(monkeypatch): + monkeypatch.setattr(auth, "get_provider_auth_state", lambda p: None) + assert nous_token_has_billing_scope() is False + + +def test_has_scope_no_substring_false_positive(monkeypatch): + # "billing:manage-lite" must NOT match billing:manage (split-based, not substring). + monkeypatch.setattr( + auth, "get_provider_auth_state", lambda p: {"scope": "billing:manage-lite"} + ) + assert nous_token_has_billing_scope() is False + + +# --------------------------------------------------------------------------- +# step_up_nous_billing_scope +# --------------------------------------------------------------------------- + + +@pytest.fixture +def _stub_persist(monkeypatch): + """Neutralize the persistence side-effects so step-up tests are pure.""" + monkeypatch.setattr(auth, "_auth_store_lock", lambda: _NullCtx()) + monkeypatch.setattr(auth, "_load_auth_store", lambda: {}) + monkeypatch.setattr(auth, "_save_provider_state", lambda *a, **kw: None) + monkeypatch.setattr(auth, "_save_auth_store", lambda *a, **kw: "auth.json") + monkeypatch.setattr(auth, "_write_shared_nous_state", lambda *a, **kw: None) + monkeypatch.setattr(auth, "_sync_nous_pool_from_auth_store", lambda: None) + + +class _NullCtx: + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + +def test_step_up_requests_billing_scope_and_reuses_prior_urls(monkeypatch, _stub_persist): + monkeypatch.setattr( + auth, + "get_provider_auth_state", + lambda p: { + "scope": "inference:invoke tool:invoke", + "portal_base_url": "https://preview.example.com", + "inference_base_url": "https://inf.example.com", + "client_id": "hermes-cli", + }, + ) + captured = {} + + def _fake_login(**kw): + captured.update(kw) + # Simulate the admin ticking the box → token comes back WITH the scope. + return {"scope": "inference:invoke tool:invoke billing:manage", "access_token": "t"} + + monkeypatch.setattr(auth, "_nous_device_code_login", _fake_login) + + granted = step_up_nous_billing_scope() + assert granted is True + # Requested scope must include billing:manage, preserving prior scopes. + assert NOUS_BILLING_MANAGE_SCOPE in captured["scope"].split() + assert "inference:invoke" in captured["scope"].split() + # Reuses the prior credential's deployment URLs (so a preview stays a preview). + assert captured["portal_base_url"] == "https://preview.example.com" + assert captured["client_id"] == "hermes-cli" + + +def test_step_up_returns_false_when_downscoped(monkeypatch, _stub_persist): + # Non-admin / unticked → NAS silently downscopes; token comes back WITHOUT scope. + monkeypatch.setattr(auth, "get_provider_auth_state", lambda p: {"scope": "inference:invoke"}) + monkeypatch.setattr( + auth, + "_nous_device_code_login", + lambda **kw: {"scope": "inference:invoke", "access_token": "t"}, + ) + assert step_up_nous_billing_scope() is False + + +def test_step_up_falls_back_to_standard_scope_when_no_prior(monkeypatch, _stub_persist): + monkeypatch.setattr(auth, "get_provider_auth_state", lambda p: {}) + captured = {} + + def _fake_login(**kw): + captured.update(kw) + return {"scope": "inference:invoke tool:invoke billing:manage"} + + monkeypatch.setattr(auth, "_nous_device_code_login", _fake_login) + step_up_nous_billing_scope() + requested = captured["scope"].split() + assert "inference:invoke" in requested + assert "tool:invoke" in requested + assert NOUS_BILLING_MANAGE_SCOPE in requested