feat(billing): billing:manage scope + lazy step-up re-auth (phase 2b)

- NOUS_BILLING_MANAGE_SCOPE constant.
- nous_token_has_billing_scope(): split-based scope check (no false-positive
  substring match).
- step_up_nous_billing_scope(): re-runs the device flow requesting
  billing:manage, reusing the held credential's portal/inference URLs + client_id
  (so a preview stays a preview), persists like _login_nous but WITHOUT the model
  picker. Returns True iff the minted token carries the scope (False when NAS
  silently downscopes a non-admin / unticked grant).

Lazy step-up (plan D-A): normal login path unchanged; 403 insufficient_scope
from a billing call triggers this. 7 unit tests.
This commit is contained in:
alt-glitch 2026-06-13 11:53:02 +05:30
parent d869bde319
commit 2275fa79ca
2 changed files with 211 additions and 0 deletions

View file

@ -71,6 +71,7 @@ DEFAULT_NOUS_PORTAL_URL = "https://portal.nousresearch.com"
DEFAULT_NOUS_INFERENCE_URL = "https://inference-api.nousresearch.com/v1"
DEFAULT_NOUS_CLIENT_ID = "hermes-cli"
NOUS_INFERENCE_INVOKE_SCOPE = "inference:invoke"
NOUS_BILLING_MANAGE_SCOPE = "billing:manage"
DEFAULT_NOUS_SCOPE = NOUS_INFERENCE_INVOKE_SCOPE
NOUS_DEVICE_CODE_SOURCE = "device_code"
NOUS_AUTH_PATH_INVOKE_JWT = "invoke_jwt"
@ -7628,6 +7629,89 @@ def _nous_device_code_login(
raise
def nous_token_has_billing_scope() -> bool:
"""Return True if the currently-held Nous token carries ``billing:manage``.
Reads the persisted ``scope`` string saved at login (``_save_provider_state``
stores ``token_data.get("scope") or scope``). A space-delimited match. Used by
the lazy step-up: if False, the first billing call will 403 ``insufficient_scope``
anyway, but checking up front lets a surface skip a doomed round-trip.
"""
try:
state = get_provider_auth_state("nous") or {}
except Exception:
return False
scope = state.get("scope")
if not isinstance(scope, str):
return False
return NOUS_BILLING_MANAGE_SCOPE in scope.split()
def step_up_nous_billing_scope(
*,
open_browser: bool = True,
timeout_seconds: float = 15.0,
) -> bool:
"""Re-run the device flow requesting ``billing:manage`` and persist the result.
The lazy step-up (plan D-A): triggered when a billing endpoint returns
``403 insufficient_scope``. Runs a fresh device-connect with
``inference:invoke tool:invoke billing:manage`` on the scope. The user must be
an ADMIN/OWNER and tick "Allow terminal billing" in the portal for the minted
token to actually carry the scope; otherwise NAS silently downscopes and this
returns False.
Reuses the held credential's portal/inference URLs + client_id so the step-up
targets the same deployment (incl. a preview via ``HERMES_PORTAL_BASE_URL`` set
at the original login). Persists to the auth store + shared store + pool, exactly
like ``_login_nous`` but WITHOUT the model picker (this is a scope upgrade, not
a fresh login).
Returns True iff the new token carries ``billing:manage``.
"""
prior = get_provider_auth_state("nous") or {}
pconfig = PROVIDER_REGISTRY["nous"]
# Build the step-up scope: existing scopes (if any) + billing:manage, deduped,
# order-stable. Fall back to the standard inference+tool+billing set.
_raw_scope = prior.get("scope")
prior_scope = _raw_scope if isinstance(_raw_scope, str) else ""
requested: list[str] = []
for tok in (prior_scope.split() or [NOUS_INFERENCE_INVOKE_SCOPE, "tool:invoke"]):
if tok and tok not in requested:
requested.append(tok)
if NOUS_BILLING_MANAGE_SCOPE not in requested:
requested.append(NOUS_BILLING_MANAGE_SCOPE)
scope = " ".join(requested)
auth_state = _nous_device_code_login(
portal_base_url=prior.get("portal_base_url") or None,
inference_base_url=prior.get("inference_base_url") or None,
client_id=prior.get("client_id") or pconfig.client_id,
scope=scope,
open_browser=open_browser,
timeout_seconds=timeout_seconds,
)
with _auth_store_lock():
auth_store = _load_auth_store()
_save_provider_state(auth_store, "nous", auth_state)
_save_auth_store(auth_store)
# Mirror to shared store + reseed the pool (best-effort), same as _login_nous.
try:
_write_shared_nous_state(auth_state)
except Exception:
pass
try:
_sync_nous_pool_from_auth_store()
except Exception:
pass
granted = auth_state.get("scope")
return isinstance(granted, str) and NOUS_BILLING_MANAGE_SCOPE in granted.split()
def _login_nous(args, pconfig: ProviderConfig) -> None:
"""Nous Portal device authorization flow."""
timeout_seconds = getattr(args, "timeout", None) or 15.0

View file

@ -0,0 +1,127 @@
"""Tests for the Phase 2b billing:manage scope step-up (auth.py)."""
from __future__ import annotations
import pytest
import hermes_cli.auth as auth
from hermes_cli.auth import (
NOUS_BILLING_MANAGE_SCOPE,
nous_token_has_billing_scope,
step_up_nous_billing_scope,
)
# ---------------------------------------------------------------------------
# nous_token_has_billing_scope
# ---------------------------------------------------------------------------
def test_has_scope_true_when_present(monkeypatch):
monkeypatch.setattr(
auth,
"get_provider_auth_state",
lambda p: {"scope": "inference:invoke tool:invoke billing:manage"},
)
assert nous_token_has_billing_scope() is True
def test_has_scope_false_when_absent(monkeypatch):
monkeypatch.setattr(
auth, "get_provider_auth_state", lambda p: {"scope": "inference:invoke tool:invoke"}
)
assert nous_token_has_billing_scope() is False
def test_has_scope_false_when_no_state(monkeypatch):
monkeypatch.setattr(auth, "get_provider_auth_state", lambda p: None)
assert nous_token_has_billing_scope() is False
def test_has_scope_no_substring_false_positive(monkeypatch):
# "billing:manage-lite" must NOT match billing:manage (split-based, not substring).
monkeypatch.setattr(
auth, "get_provider_auth_state", lambda p: {"scope": "billing:manage-lite"}
)
assert nous_token_has_billing_scope() is False
# ---------------------------------------------------------------------------
# step_up_nous_billing_scope
# ---------------------------------------------------------------------------
@pytest.fixture
def _stub_persist(monkeypatch):
"""Neutralize the persistence side-effects so step-up tests are pure."""
monkeypatch.setattr(auth, "_auth_store_lock", lambda: _NullCtx())
monkeypatch.setattr(auth, "_load_auth_store", lambda: {})
monkeypatch.setattr(auth, "_save_provider_state", lambda *a, **kw: None)
monkeypatch.setattr(auth, "_save_auth_store", lambda *a, **kw: "auth.json")
monkeypatch.setattr(auth, "_write_shared_nous_state", lambda *a, **kw: None)
monkeypatch.setattr(auth, "_sync_nous_pool_from_auth_store", lambda: None)
class _NullCtx:
def __enter__(self):
return self
def __exit__(self, *a):
return False
def test_step_up_requests_billing_scope_and_reuses_prior_urls(monkeypatch, _stub_persist):
monkeypatch.setattr(
auth,
"get_provider_auth_state",
lambda p: {
"scope": "inference:invoke tool:invoke",
"portal_base_url": "https://preview.example.com",
"inference_base_url": "https://inf.example.com",
"client_id": "hermes-cli",
},
)
captured = {}
def _fake_login(**kw):
captured.update(kw)
# Simulate the admin ticking the box → token comes back WITH the scope.
return {"scope": "inference:invoke tool:invoke billing:manage", "access_token": "t"}
monkeypatch.setattr(auth, "_nous_device_code_login", _fake_login)
granted = step_up_nous_billing_scope()
assert granted is True
# Requested scope must include billing:manage, preserving prior scopes.
assert NOUS_BILLING_MANAGE_SCOPE in captured["scope"].split()
assert "inference:invoke" in captured["scope"].split()
# Reuses the prior credential's deployment URLs (so a preview stays a preview).
assert captured["portal_base_url"] == "https://preview.example.com"
assert captured["client_id"] == "hermes-cli"
def test_step_up_returns_false_when_downscoped(monkeypatch, _stub_persist):
# Non-admin / unticked → NAS silently downscopes; token comes back WITHOUT scope.
monkeypatch.setattr(auth, "get_provider_auth_state", lambda p: {"scope": "inference:invoke"})
monkeypatch.setattr(
auth,
"_nous_device_code_login",
lambda **kw: {"scope": "inference:invoke", "access_token": "t"},
)
assert step_up_nous_billing_scope() is False
def test_step_up_falls_back_to_standard_scope_when_no_prior(monkeypatch, _stub_persist):
monkeypatch.setattr(auth, "get_provider_auth_state", lambda p: {})
captured = {}
def _fake_login(**kw):
captured.update(kw)
return {"scope": "inference:invoke tool:invoke billing:manage"}
monkeypatch.setattr(auth, "_nous_device_code_login", _fake_login)
step_up_nous_billing_scope()
requested = captured["scope"].split()
assert "inference:invoke" in requested
assert "tool:invoke" in requested
assert NOUS_BILLING_MANAGE_SCOPE in requested