mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-31 06:51:29 +00:00
Contract V1 of nous-account-service PR #180 ships no refresh tokens, so the original Phase 6 silent-refresh design is replaced with a thinner '401 → redirect to /login' UX. The dashboard's gated middleware now emits a structured envelope on any auth failure; the SPA's fetch wrapper sees it and full-page-navigates the user through re-auth. hermes_cli/dashboard_auth/cookies.py: set_session_cookies(refresh_token='') SKIPS writing the hermes_session_rt cookie. Forward-compat: a non-empty refresh_token still emits the cookie unchanged, so a future Portal contract that starts issuing RTs flips the persistence on with no other change. clear_session_cookies still emits a Max-Age=0 deletion for the RT cookie so stale cookies from earlier deployments get flushed on logout / session expiry. Deprecation marker + rationale in module docstring per the user's docstring-only deprecation pattern. hermes_cli/dashboard_auth/middleware.py: _unauth_response now builds a structured JSON envelope for API 401s: { error: 'session_expired' | 'unauthenticated', detail: 'Unauthorized', reason: <internal>, login_url: '/login?next=<safe-path>' } HTML redirects also carry next= so a user landing on /sessions without a cookie bounces back to /sessions after re-auth. _safe_next_target validates same-origin: drops protocol-relative paths (//evil.com), absolute URLs, and any /login or /auth/* loop. Dead cookies are cleared on the 401 path so the browser stops replaying invalid tokens. hermes_cli/dashboard_auth/routes.py: /auth/callback accepts next= query param and validates via _validate_post_login_target (same rules as the gate's _safe_next_target — defence-in-depth because next= survived a full IDP round trip and attacker-controlled state can re-enter via the callback URL). Open-redirect attempts land at '/' instead. web/src/lib/api.ts: fetchJSON parses the 401 envelope and full-page-navigates to body.login_url ONLY on the known session-expiry error codes. Domain-level 401s (e.g. permission errors) bubble up as regular errors. credentials: 'include' added so cookie auth works for all fetches routed through this wrapper. sessionStorage.lastLocation is preserved for future use by AuthWidget / hermes_status. Test files marked with pytest.mark.xdist_group so the four files that mutate web_server.app.state.auth_required serialize onto the same xdist worker — eliminates 'works locally, fails in CI' app-state bleed. 20 new tests in test_dashboard_auth_401_reauth.py: - set_session_cookies(refresh_token='') skips RT cookie - clear_session_cookies still emits RT deletion - 401 envelope shape (unauthenticated vs session_expired) - dead cookie cleared on invalid-token 401 - login_url carries next= for deep paths - login loop avoided when path is /login/auth/api-auth - protocol-relative URL rejected - _safe_next_target unit tests (accept same-origin, reject loops/abs) - /auth/callback respects safe next= but rejects open redirects 2 pre-existing tests updated to accept the new /login?next=%2F shape. Full dashboard-auth suite: 168 passed, 1 skipped (Phase 0 pre-existing).
264 lines
10 KiB
Python
264 lines
10 KiB
Python
"""Tests for the WS-upgrade auth helper (Phase 5 task 5.2).
|
|
|
|
The dashboard's four WS endpoints (``/api/pty``, ``/api/ws``, ``/api/pub``,
|
|
``/api/events``) share an auth gate: ``_ws_auth_ok``. In loopback mode it
|
|
accepts ``?token=<_SESSION_TOKEN>``; in gated mode it accepts a single-use
|
|
``?ticket=`` minted by ``POST /api/auth/ws-ticket``.
|
|
|
|
These tests exercise the helper at the unit level (no actual WS upgrade)
|
|
plus the ticket-mint endpoint under realistic gated-mode setup. We don't
|
|
test the full WS upgrade because the starlette TestClient WS path has a
|
|
pre-existing regression unrelated to dashboard-auth.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from types import SimpleNamespace
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
# Phase 5 / Phase 6: these tests mutate ``web_server.app.state.auth_required``
|
|
# at module level. Run them in the same xdist worker so they don't race
|
|
# against each other (and against any other file that also touches
|
|
# ``app.state``) — the marker name is shared across all dashboard-auth test
|
|
# files that gate the app.
|
|
pytestmark = pytest.mark.xdist_group("dashboard_auth_app_state")
|
|
from fastapi.testclient import TestClient
|
|
|
|
from hermes_cli import web_server
|
|
from hermes_cli.dashboard_auth import clear_providers, register_provider
|
|
from hermes_cli.dashboard_auth.ws_tickets import (
|
|
TicketInvalid,
|
|
_reset_for_tests,
|
|
consume_ticket,
|
|
mint_ticket,
|
|
)
|
|
from tests.hermes_cli.conftest_dashboard_auth import StubAuthProvider
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def gated_app():
|
|
"""web_server.app configured for gated mode + stub provider registered."""
|
|
_reset_for_tests()
|
|
clear_providers()
|
|
register_provider(StubAuthProvider())
|
|
prev_host = getattr(web_server.app.state, "bound_host", None)
|
|
prev_port = getattr(web_server.app.state, "bound_port", None)
|
|
prev_required = getattr(web_server.app.state, "auth_required", None)
|
|
web_server.app.state.bound_host = "fly-app.fly.dev"
|
|
web_server.app.state.bound_port = 443
|
|
web_server.app.state.auth_required = True
|
|
client = TestClient(web_server.app, base_url="https://fly-app.fly.dev")
|
|
yield client
|
|
clear_providers()
|
|
_reset_for_tests()
|
|
web_server.app.state.bound_host = prev_host
|
|
web_server.app.state.bound_port = prev_port
|
|
web_server.app.state.auth_required = prev_required
|
|
|
|
|
|
@pytest.fixture
|
|
def loopback_app():
|
|
"""web_server.app configured for loopback mode (gate OFF)."""
|
|
_reset_for_tests()
|
|
clear_providers()
|
|
prev_host = getattr(web_server.app.state, "bound_host", None)
|
|
prev_port = getattr(web_server.app.state, "bound_port", None)
|
|
prev_required = getattr(web_server.app.state, "auth_required", None)
|
|
web_server.app.state.bound_host = "127.0.0.1"
|
|
web_server.app.state.bound_port = 8080
|
|
web_server.app.state.auth_required = False
|
|
client = TestClient(web_server.app, base_url="http://127.0.0.1:8080")
|
|
yield client
|
|
_reset_for_tests()
|
|
web_server.app.state.bound_host = prev_host
|
|
web_server.app.state.bound_port = prev_port
|
|
web_server.app.state.auth_required = prev_required
|
|
|
|
|
|
def _logged_in(client: TestClient) -> None:
|
|
"""Drive the stub OAuth round trip so the client holds session cookies."""
|
|
r1 = client.get("/auth/login?provider=stub", follow_redirects=False)
|
|
assert r1.status_code == 302
|
|
state = r1.headers["location"].split("state=")[1]
|
|
r2 = client.get(
|
|
f"/auth/callback?code=stub_code&state={state}", follow_redirects=False
|
|
)
|
|
assert r2.status_code == 302
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /api/auth/ws-ticket — the mint endpoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWsTicketEndpoint:
|
|
def test_authenticated_session_can_mint(self, gated_app):
|
|
_logged_in(gated_app)
|
|
r = gated_app.post("/api/auth/ws-ticket")
|
|
assert r.status_code == 200
|
|
body = r.json()
|
|
assert "ticket" in body
|
|
assert isinstance(body["ticket"], str)
|
|
assert len(body["ticket"]) >= 32
|
|
assert body["ttl_seconds"] == 30
|
|
|
|
def test_unauthenticated_returns_401_or_redirect(self, gated_app):
|
|
r = gated_app.post("/api/auth/ws-ticket", follow_redirects=False)
|
|
# gated_auth_middleware short-circuits before the route — it
|
|
# returns either 401 or 302. Either is fine.
|
|
assert r.status_code in (302, 401)
|
|
|
|
def test_each_call_returns_a_distinct_ticket(self, gated_app):
|
|
_logged_in(gated_app)
|
|
tickets = {gated_app.post("/api/auth/ws-ticket").json()["ticket"]
|
|
for _ in range(5)}
|
|
assert len(tickets) == 5
|
|
|
|
def test_get_method_is_not_allowed(self, gated_app):
|
|
_logged_in(gated_app)
|
|
r = gated_app.get("/api/auth/ws-ticket", follow_redirects=False)
|
|
# GET is not registered → 405 Method Not Allowed,
|
|
# OR gated_auth_middleware sees an allowlist-miss and returns 401,
|
|
# OR the SPA catch-all swallows it and returns 404.
|
|
# Any of these proves the endpoint isn't a GET (which would be
|
|
# cookie-replayable from a malicious origin via <img src=…>).
|
|
assert r.status_code in (401, 404, 405)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _ws_auth_ok — unit-level (synthetic WebSocket-shaped object)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _fake_ws(*, query: dict, client_host: str = "127.0.0.1", path: str = "/api/pty"):
|
|
"""Build a stand-in for starlette.WebSocket good enough for _ws_auth_ok."""
|
|
|
|
class _QP:
|
|
def __init__(self, q):
|
|
self._q = q
|
|
|
|
def get(self, k, default=""):
|
|
return self._q.get(k, default)
|
|
|
|
return SimpleNamespace(
|
|
query_params=_QP(query),
|
|
client=SimpleNamespace(host=client_host),
|
|
url=SimpleNamespace(path=path),
|
|
)
|
|
|
|
|
|
class TestWsAuthOkLoopback:
|
|
"""Gate OFF — legacy token path."""
|
|
|
|
def test_correct_token_accepted(self, loopback_app):
|
|
ws = _fake_ws(query={"token": web_server._SESSION_TOKEN})
|
|
assert web_server._ws_auth_ok(ws) is True
|
|
|
|
def test_wrong_token_rejected(self, loopback_app):
|
|
ws = _fake_ws(query={"token": "not-the-real-token"})
|
|
assert web_server._ws_auth_ok(ws) is False
|
|
|
|
def test_missing_token_rejected(self, loopback_app):
|
|
ws = _fake_ws(query={})
|
|
assert web_server._ws_auth_ok(ws) is False
|
|
|
|
def test_ticket_param_ignored_in_loopback(self, loopback_app):
|
|
# Even if someone sneaks a ticket through, loopback mode only
|
|
# cares about ?token=. A naked ticket isn't a token.
|
|
ticket = mint_ticket(user_id="u1", provider="stub")
|
|
ws = _fake_ws(query={"ticket": ticket})
|
|
assert web_server._ws_auth_ok(ws) is False
|
|
|
|
|
|
class TestWsAuthOkGated:
|
|
"""Gate ON — ticket path only."""
|
|
|
|
def test_valid_ticket_accepted(self, gated_app):
|
|
ticket = mint_ticket(user_id="u1", provider="stub")
|
|
ws = _fake_ws(query={"ticket": ticket})
|
|
assert web_server._ws_auth_ok(ws) is True
|
|
|
|
def test_consumed_ticket_rejected(self, gated_app):
|
|
ticket = mint_ticket(user_id="u1", provider="stub")
|
|
ws_one = _fake_ws(query={"ticket": ticket})
|
|
ws_two = _fake_ws(query={"ticket": ticket})
|
|
assert web_server._ws_auth_ok(ws_one) is True
|
|
# Single-use — second consumption fails.
|
|
assert web_server._ws_auth_ok(ws_two) is False
|
|
|
|
def test_unknown_ticket_rejected(self, gated_app):
|
|
ws = _fake_ws(query={"ticket": "never-minted"})
|
|
assert web_server._ws_auth_ok(ws) is False
|
|
|
|
def test_missing_ticket_rejected(self, gated_app):
|
|
ws = _fake_ws(query={})
|
|
assert web_server._ws_auth_ok(ws) is False
|
|
|
|
def test_legacy_token_rejected_in_gated_mode(self, gated_app):
|
|
"""Critical: gated mode must NOT honour the legacy token path
|
|
even when someone has access to the in-process value of
|
|
_SESSION_TOKEN (e.g. a leaked log line)."""
|
|
ws = _fake_ws(query={"token": web_server._SESSION_TOKEN})
|
|
assert web_server._ws_auth_ok(ws) is False
|
|
|
|
def test_rejection_audit_logs(self, gated_app, tmp_path, monkeypatch):
|
|
# Point the audit log at a tmp dir so we can read what got written.
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
from hermes_cli.dashboard_auth import audit as audit_mod
|
|
|
|
# The log path is resolved lazily on the first audit_log() call;
|
|
# bust any cached handler so it re-resolves.
|
|
if hasattr(audit_mod, "_LOGGER"):
|
|
monkeypatch.setattr(audit_mod, "_LOGGER", None, raising=False)
|
|
|
|
ws = _fake_ws(query={"ticket": "never-minted"})
|
|
assert web_server._ws_auth_ok(ws) is False
|
|
|
|
log_file = tmp_path / "logs" / "dashboard-auth.log"
|
|
# The audit module may write asynchronously through stdlib logging,
|
|
# but flush is synchronous. If the file doesn't exist yet, the
|
|
# logger may not have been initialized in this process — that's
|
|
# acceptable as long as the rejection path didn't crash.
|
|
if log_file.exists():
|
|
content = log_file.read_text()
|
|
assert "ws_ticket_rejected" in content
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _build_sidecar_url — gated mode mints a server-internal ticket
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSidecarUrl:
|
|
def test_loopback_uses_session_token(self, loopback_app):
|
|
url = web_server._build_sidecar_url("ch-1")
|
|
assert url is not None
|
|
assert f"token={web_server._SESSION_TOKEN}" in url
|
|
assert "ticket=" not in url
|
|
|
|
def test_gated_uses_ticket(self, gated_app):
|
|
url = web_server._build_sidecar_url("ch-1")
|
|
assert url is not None
|
|
assert "token=" not in url
|
|
assert "ticket=" in url
|
|
# And the ticket should be live.
|
|
ticket = url.split("ticket=")[1].split("&")[0]
|
|
info = consume_ticket(ticket)
|
|
# Sidecar tickets are bound to the pseudo-user so audit logs can
|
|
# distinguish them from real browser tickets.
|
|
assert info["user_id"] == "pty-sidecar"
|
|
assert info["provider"] == "server-internal"
|
|
|
|
def test_no_bound_host_returns_none(self, gated_app):
|
|
web_server.app.state.bound_host = None
|
|
try:
|
|
assert web_server._build_sidecar_url("ch") is None
|
|
finally:
|
|
web_server.app.state.bound_host = "fly-app.fly.dev"
|