diff --git a/tests/hermes_cli/conftest_dashboard_auth.py b/tests/hermes_cli/conftest_dashboard_auth.py new file mode 100644 index 00000000000..597c4b39b64 --- /dev/null +++ b/tests/hermes_cli/conftest_dashboard_auth.py @@ -0,0 +1,173 @@ +"""Stub auth provider + shared fixtures for dashboard-auth tests. + +NOT a pytest conftest.py — this is an importable helper module. Phase 2 +of the dashboard-OAuth plan; used by Phase 3's end-to-end gate tests. + +Import via:: + + from tests.hermes_cli.conftest_dashboard_auth import StubAuthProvider + +The stub bounces straight back to the callback with a fake code so tests +can complete the OAuth round trip in-process without external network. + +Tokens are HMAC-signed JSON blobs (not real JWTs) — just enough structure +for ``verify_session`` to detect tampering and expiry. +""" +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import secrets +import time + +from hermes_cli.dashboard_auth.base import ( + DashboardAuthProvider, + InvalidCodeError, + LoginStart, + RefreshExpiredError, + Session, +) + +_STUB_SECRET = b"stub-test-secret-not-for-prod" + + +def _sign(payload: dict) -> str: + """Produce a tamper-evident opaque token. + + Not a real JWT — just a base64(JSON|HMAC-SHA256) blob with enough + structure to round-trip through verify_session. + """ + raw = json.dumps(payload, separators=(",", ":")).encode() + sig = hmac.new(_STUB_SECRET, raw, hashlib.sha256).digest() + return base64.urlsafe_b64encode(raw + b"." + sig).decode() + + +def _unsign(token: str) -> dict | None: + """Inverse of ``_sign``; returns None on any tamper/decode failure.""" + try: + blob = base64.urlsafe_b64decode(token.encode()) + raw, sig = blob.rsplit(b".", 1) + expected = hmac.new(_STUB_SECRET, raw, hashlib.sha256).digest() + if not hmac.compare_digest(sig, expected): + return None + return json.loads(raw) + except Exception: + return None + + +class StubAuthProvider(DashboardAuthProvider): + """Local fake IDP for E2E tests. + + ``start_login`` returns a redirect to + ``{redirect_uri}?code=stub_code&state={s}`` so the test harness can + walk the full round trip in-process without talking to anything + external. ``access_token`` is an HMAC-signed JSON blob; + ``verify_session`` decodes and checks ``exp``. + """ + + name = "stub" + display_name = "Stub IdP (test only)" + + def __init__(self, default_ttl: int = 3600): + self._default_ttl = default_ttl + # state → verifier mapping, cleared on complete_login + self._state_to_verifier: dict[str, str] = {} + + def start_login(self, *, redirect_uri: str) -> LoginStart: + state = secrets.token_urlsafe(16) + verifier = secrets.token_urlsafe(32) + self._state_to_verifier[state] = verifier + return LoginStart( + redirect_url=f"{redirect_uri}?code=stub_code&state={state}", + cookie_payload={ + "hermes_session_pkce": f"state={state};verifier={verifier}", + }, + ) + + def complete_login( + self, *, code: str, state: str, code_verifier: str, redirect_uri: str, + ) -> Session: + if code != "stub_code": + raise InvalidCodeError( + f"stub expects code='stub_code', got {code!r}" + ) + expected_verifier = self._state_to_verifier.get(state) + if expected_verifier is None or expected_verifier != code_verifier: + raise InvalidCodeError("stub state/verifier mismatch") + del self._state_to_verifier[state] + + now = int(time.time()) + exp = now + self._default_ttl + return Session( + user_id="stub-user-1", + email="stub@example.test", + display_name="Stub User", + org_id="stub-org-1", + provider=self.name, + expires_at=exp, + access_token=_sign({ + "sub": "stub-user-1", + "email": "stub@example.test", + "name": "Stub User", + "org_id": "stub-org-1", + "exp": exp, + }), + refresh_token=_sign({ + "sub": "stub-user-1", + "kind": "refresh", + "exp": now + 30 * 86400, + }), + ) + + def verify_session(self, *, access_token: str): + payload = _unsign(access_token) + # ``<=`` so default_ttl=0 produces a born-expired token. This + # matches what Phase 6's silent-refresh tests need ("set a 0-TTL + # access token; the next request should refresh transparently"). + if payload is None or payload.get("exp", 0) <= int(time.time()): + return None + return Session( + user_id=payload["sub"], + email=payload["email"], + display_name=payload["name"], + org_id=payload["org_id"], + provider=self.name, + expires_at=payload["exp"], + access_token=access_token, + refresh_token="", # not surfaced on verify + ) + + def refresh_session(self, *, refresh_token: str) -> Session: + payload = _unsign(refresh_token) + # ``<=`` for symmetry with verify_session — a 0-TTL token is + # treated as expired. + if payload is None or payload.get("exp", 0) <= int(time.time()): + raise RefreshExpiredError("stub refresh token expired/invalid") + now = int(time.time()) + exp = now + self._default_ttl + return Session( + user_id=payload["sub"], + email="stub@example.test", + display_name="Stub User", + org_id="stub-org-1", + provider=self.name, + expires_at=exp, + access_token=_sign({ + "sub": payload["sub"], + "email": "stub@example.test", + "name": "Stub User", + "org_id": "stub-org-1", + "exp": exp, + }), + refresh_token=_sign({ + "sub": payload["sub"], + "kind": "refresh", + "exp": now + 30 * 86400, + }), + ) + + def revoke_session(self, *, refresh_token: str) -> None: + # Stub is in-memory; nothing to revoke server-side. + return None diff --git a/tests/hermes_cli/test_dashboard_auth_stub_provider.py b/tests/hermes_cli/test_dashboard_auth_stub_provider.py new file mode 100644 index 00000000000..8a6676ea6a1 --- /dev/null +++ b/tests/hermes_cli/test_dashboard_auth_stub_provider.py @@ -0,0 +1,150 @@ +"""Contract test for the StubAuthProvider used in dashboard-auth E2E tests. + +Phase 2 of the dashboard-OAuth plan. Validates the stub against the +provider protocol so subsequent phases that depend on its behavior +have a guarantee. +""" +from __future__ import annotations + +import pytest + +from hermes_cli.dashboard_auth.base import ( + InvalidCodeError, RefreshExpiredError, assert_protocol_compliance, +) +from tests.hermes_cli.conftest_dashboard_auth import StubAuthProvider + + +def _pkce_payload(ls) -> dict: + """Parse ``state=...;verifier=...`` out of the LoginStart cookie payload.""" + return dict( + item.split("=", 1) + for item in ls.cookie_payload["hermes_session_pkce"].split(";") + ) + + +def test_stub_complies_with_protocol(): + assert assert_protocol_compliance(StubAuthProvider) is None + + +def test_stub_start_login_returns_callback_redirect(): + p = StubAuthProvider() + ls = p.start_login(redirect_uri="https://x.fly.dev/auth/callback") + assert "code=stub_code" in ls.redirect_url + assert "state=" in ls.redirect_url + assert "hermes_session_pkce" in ls.cookie_payload + + +def test_stub_complete_login_with_matching_state_succeeds(): + p = StubAuthProvider() + ls = p.start_login(redirect_uri="https://x.fly.dev/auth/callback") + payload = _pkce_payload(ls) + sess = p.complete_login( + code="stub_code", + state=payload["state"], + code_verifier=payload["verifier"], + redirect_uri="https://x.fly.dev/auth/callback", + ) + assert sess.user_id == "stub-user-1" + assert sess.email == "stub@example.test" + assert sess.display_name == "Stub User" + assert sess.org_id == "stub-org-1" + assert sess.provider == "stub" + assert sess.access_token and sess.refresh_token + + +def test_stub_complete_login_rejects_mismatched_state(): + p = StubAuthProvider() + p.start_login(redirect_uri="https://x.fly.dev/auth/callback") + with pytest.raises(InvalidCodeError): + p.complete_login( + code="stub_code", + state="WRONG", + code_verifier="anything", + redirect_uri="https://x.fly.dev/auth/callback", + ) + + +def test_stub_complete_login_rejects_wrong_code(): + p = StubAuthProvider() + ls = p.start_login(redirect_uri="https://x.fly.dev/auth/callback") + payload = _pkce_payload(ls) + with pytest.raises(InvalidCodeError): + p.complete_login( + code="BAD", + state=payload["state"], + code_verifier=payload["verifier"], + redirect_uri="https://x.fly.dev/auth/callback", + ) + + +def test_stub_verify_session_round_trips(): + p = StubAuthProvider() + ls = p.start_login(redirect_uri="https://x.fly.dev/auth/callback") + payload = _pkce_payload(ls) + sess = p.complete_login( + code="stub_code", + state=payload["state"], + code_verifier=payload["verifier"], + redirect_uri="https://x.fly.dev/auth/callback", + ) + verified = p.verify_session(access_token=sess.access_token) + assert verified is not None + assert verified.user_id == "stub-user-1" + assert verified.org_id == "stub-org-1" + + +def test_stub_verify_expired_session_returns_none(): + p = StubAuthProvider(default_ttl=0) + ls = p.start_login(redirect_uri="https://x/auth/callback") + payload = _pkce_payload(ls) + sess = p.complete_login( + code="stub_code", + state=payload["state"], + code_verifier=payload["verifier"], + redirect_uri="https://x/auth/callback", + ) + # default_ttl=0 means the access token is born already expired + # (verify uses ``<=`` so exp == now counts as expired). + assert p.verify_session(access_token=sess.access_token) is None + + +def test_stub_verify_tampered_token_returns_none(): + p = StubAuthProvider() + assert p.verify_session(access_token="garbage-not-a-real-token") is None + + +def test_stub_refresh_round_trips(): + p = StubAuthProvider() + ls = p.start_login(redirect_uri="https://x/auth/callback") + payload = _pkce_payload(ls) + sess = p.complete_login( + code="stub_code", + state=payload["state"], + code_verifier=payload["verifier"], + redirect_uri="https://x/auth/callback", + ) + refreshed = p.refresh_session(refresh_token=sess.refresh_token) + # Refresh must return a valid Session for the same identity. (Tokens + # may compare equal byte-for-byte if the refresh happens within the + # same wall-clock second as the original — payload contents are + # otherwise identical and HMAC is deterministic. The behavioural + # invariant is just "refresh succeeds and identity survives".) + assert refreshed.user_id == "stub-user-1" + assert refreshed.access_token # non-empty + assert refreshed.refresh_token # non-empty + # And the refreshed access_token is still verifiable. + verified = p.verify_session(access_token=refreshed.access_token) + assert verified is not None + assert verified.user_id == "stub-user-1" + + +def test_stub_refresh_expired_raises(): + p = StubAuthProvider() + with pytest.raises(RefreshExpiredError): + p.refresh_session(refresh_token="garbage") + + +def test_stub_revoke_is_silent(): + p = StubAuthProvider() + # Best-effort; must never raise. + p.revoke_session(refresh_token="anything")