From 2b972472cee873032f48e51c26e85a0badf36caa Mon Sep 17 00:00:00 2001 From: Hao Zhe Date: Tue, 26 May 2026 14:30:45 +0800 Subject: [PATCH] fix(memory): validate OpenViking manual setup steps --- plugins/memory/openviking/__init__.py | 285 ++++++++--- .../memory/test_openviking_provider.py | 460 +++++++++++++++++- 2 files changed, 671 insertions(+), 74 deletions(-) diff --git a/plugins/memory/openviking/__init__.py b/plugins/memory/openviking/__init__.py index 29b3e5ad7c4..c3180305fb0 100644 --- a/plugins/memory/openviking/__init__.py +++ b/plugins/memory/openviking/__init__.py @@ -223,6 +223,14 @@ class _VikingClient: except Exception: return False + def validate_auth(self) -> dict: + """Validate authenticated OpenViking access without mutating state.""" + return self.get("/api/v1/system/status") + + def validate_root_access(self) -> dict: + """Validate ROOT access against a read-only admin endpoint.""" + return self.get("/api/v1/admin/accounts") + # --------------------------------------------------------------------------- # Tool schemas @@ -564,17 +572,11 @@ def _remember_ovcli_path(provider_config: dict, ovcli_path: Path) -> None: def _ovcli_data_from_connection_values(values: dict) -> dict: data = {"url": _clean_config_value(values.get("endpoint")) or _DEFAULT_ENDPOINT} api_key = _clean_config_value(values.get("api_key")) - api_key_type = _clean_config_value(values.get("api_key_type")) - root_api_key = _clean_config_value(values.get("root_api_key")) account = _clean_config_value(values.get("account")) user = _clean_config_value(values.get("user")) agent = _clean_config_value(values.get("agent")) or _DEFAULT_AGENT if api_key: data["api_key"] = api_key - if root_api_key: - data["root_api_key"] = root_api_key - elif api_key and api_key_type == "root": - data["root_api_key"] = api_key if account: data["account"] = account if user: @@ -590,76 +592,217 @@ def _write_ovcli_config(path: Path, values: dict) -> None: _restrict_secret_file_permissions(path) -def _prompt_manual_connection_values(prompt, select, cancelled): - endpoint = _clean_config_value( - prompt("OpenViking server URL", default=_DEFAULT_ENDPOINT) - ) or _DEFAULT_ENDPOINT - is_local = _is_local_openviking_url(endpoint) +def _validate_openviking_reachability(endpoint: str) -> tuple[bool, str]: + endpoint = _clean_config_value(endpoint) or _DEFAULT_ENDPOINT + try: + client = _VikingClient(endpoint) + if client.health(): + return True, "" + except Exception as e: + return False, f"OpenViking server is not reachable at {endpoint}: {e}" + return False, f"OpenViking server is not reachable at {endpoint}." - values = { - "endpoint": endpoint, - "api_key": "", - "account": "", - "user": "", - "agent": "", - } - if is_local: - credential_choice = select( - " OpenViking credential", - [ - ("No API key", "local dev mode"), - ("User API key", "server derives account/user automatically"), - ("Root API key", "requires account and user IDs"), - ], - default=0, - cancel_returns=cancelled, - ) - if credential_choice == cancelled: - return _SETUP_CANCELLED - if credential_choice == 0: - values["agent"] = _clean_config_value( - prompt("OpenViking agent", default=_DEFAULT_AGENT) - ) or _DEFAULT_AGENT - return values - api_key_type = "root" if credential_choice == 2 else "user" - else: - credential_choice = select( - " OpenViking API key type", - [ - ("User API key", "server derives account/user automatically"), - ("Root API key", "requires account and user IDs"), - ], - default=0, - cancel_returns=cancelled, - ) - if credential_choice == cancelled: - return _SETUP_CANCELLED - api_key_type = "root" if credential_choice == 1 else "user" - values["api_key_type"] = api_key_type - api_key_label = ( - "OpenViking root API key" - if api_key_type == "root" - else "OpenViking user API key" +def _validate_openviking_auth(values: dict) -> tuple[bool, str]: + endpoint = _clean_config_value(values.get("endpoint")) or _DEFAULT_ENDPOINT + try: + client = _VikingClient( + endpoint, + _clean_config_value(values.get("api_key")), + account=_clean_config_value(values.get("account")), + user=_clean_config_value(values.get("user")), + agent=_clean_config_value(values.get("agent")) or _DEFAULT_AGENT, + ) + client.validate_auth() + except Exception as e: + return False, f"OpenViking authentication validation failed: {e}" + return True, "" + + +def _validate_openviking_root_access(values: dict) -> tuple[bool, str]: + endpoint = _clean_config_value(values.get("endpoint")) or _DEFAULT_ENDPOINT + try: + client = _VikingClient( + endpoint, + _clean_config_value(values.get("api_key")), + agent=_clean_config_value(values.get("agent")) or _DEFAULT_AGENT, + ) + client.validate_root_access() + except Exception as e: + return False, f"OpenViking root API key validation failed: {e}" + return True, "" + + +def _validate_openviking_user_key_scope(values: dict) -> tuple[bool, str]: + root_ok, _message = _validate_openviking_root_access(values) + if not root_ok: + return True, "" + return ( + False, + "That key has ROOT access. Choose Root API key and provide account/user, " + "or enter a user API key.", ) - values["api_key"] = _clean_config_value(prompt(api_key_label, secret=True)) - if not values["api_key"]: - print(f"\n {api_key_label} is required.") - print(" No changes saved.\n") - return None - if api_key_type == "root": - values["account"] = _clean_config_value(prompt("OpenViking account")) - values["user"] = _clean_config_value(prompt("OpenViking user")) - if not values["account"] or not values["user"]: - print("\n Root API keys require both OpenViking account and user.") - print(" No changes saved.\n") - return None - values["agent"] = _clean_config_value( - prompt("OpenViking agent", default=_DEFAULT_AGENT) - ) or _DEFAULT_AGENT - return values +def _retry_or_cancel_manual_setup(select, title: str, message: str, cancelled): + print(f" {message}") + choice = select( + title, + [ + ("Retry", "try this step again"), + ("Cancel setup", "no changes saved"), + ], + default=0, + cancel_returns=cancelled, + ) + if choice == 0: + return True + return _SETUP_CANCELLED + + +def _prompt_manual_connection_values(prompt, select, cancelled): + while True: + endpoint = _clean_config_value( + prompt("OpenViking server URL", default=_DEFAULT_ENDPOINT) + ) or _DEFAULT_ENDPOINT + reachable, message = _validate_openviking_reachability(endpoint) + if reachable: + print(" OpenViking server is reachable.") + break + retry = _retry_or_cancel_manual_setup( + select, + " OpenViking server unreachable", + message, + cancelled, + ) + if retry is _SETUP_CANCELLED: + return _SETUP_CANCELLED + + is_local = _is_local_openviking_url(endpoint) + while True: + values = { + "endpoint": endpoint, + "api_key": "", + "account": "", + "user": "", + "agent": "", + } + if is_local: + credential_choice = select( + " OpenViking credential", + [ + ("No API key", "local dev mode"), + ("User API key", "server derives account/user automatically"), + ("Root API key", "requires account and user IDs"), + ], + default=0, + cancel_returns=cancelled, + ) + if credential_choice == cancelled: + return _SETUP_CANCELLED + if credential_choice == 0: + values["agent"] = _clean_config_value( + prompt("OpenViking agent", default=_DEFAULT_AGENT) + ) or _DEFAULT_AGENT + authenticated, message = _validate_openviking_auth(values) + if authenticated: + print(" OpenViking local dev access validated.") + return values + retry = _retry_or_cancel_manual_setup( + select, + " OpenViking credential failed", + message, + cancelled, + ) + if retry is _SETUP_CANCELLED: + return _SETUP_CANCELLED + continue + api_key_type = "root" if credential_choice == 2 else "user" + else: + credential_choice = select( + " OpenViking API key type", + [ + ("User API key", "server derives account/user automatically"), + ("Root API key", "requires account and user IDs"), + ], + default=0, + cancel_returns=cancelled, + ) + if credential_choice == cancelled: + return _SETUP_CANCELLED + api_key_type = "root" if credential_choice == 1 else "user" + + values["api_key_type"] = api_key_type + api_key_label = ( + "OpenViking root API key" + if api_key_type == "root" + else "OpenViking user API key" + ) + values["api_key"] = _clean_config_value(prompt(api_key_label, secret=True)) + if not values["api_key"]: + retry = _retry_or_cancel_manual_setup( + select, + " OpenViking API key required", + f"{api_key_label} is required.", + cancelled, + ) + if retry is _SETUP_CANCELLED: + return _SETUP_CANCELLED + continue + + if api_key_type == "root": + root_ok, message = _validate_openviking_root_access(values) + if not root_ok: + retry = _retry_or_cancel_manual_setup( + select, + " OpenViking root API key failed", + message, + cancelled, + ) + if retry is _SETUP_CANCELLED: + return _SETUP_CANCELLED + continue + print(" OpenViking root API key validated.") + values["account"] = _clean_config_value(prompt("OpenViking account")) + values["user"] = _clean_config_value(prompt("OpenViking user")) + if not values["account"] or not values["user"]: + retry = _retry_or_cancel_manual_setup( + select, + " OpenViking tenant identity required", + "Root API keys require both OpenViking account and user.", + cancelled, + ) + if retry is _SETUP_CANCELLED: + return _SETUP_CANCELLED + continue + + values["agent"] = _clean_config_value( + prompt("OpenViking agent", default=_DEFAULT_AGENT) + ) or _DEFAULT_AGENT + authenticated, message = _validate_openviking_auth(values) + if authenticated: + if api_key_type == "user": + user_key_ok, message = _validate_openviking_user_key_scope(values) + if not user_key_ok: + retry = _retry_or_cancel_manual_setup( + select, + " OpenViking user API key is root key", + message, + cancelled, + ) + if retry is _SETUP_CANCELLED: + return _SETUP_CANCELLED + continue + print(" OpenViking API access validated.") + return values + retry = _retry_or_cancel_manual_setup( + select, + " OpenViking API access failed", + message, + cancelled, + ) + if retry is _SETUP_CANCELLED: + return _SETUP_CANCELLED # --------------------------------------------------------------------------- diff --git a/tests/plugins/memory/test_openviking_provider.py b/tests/plugins/memory/test_openviking_provider.py index 2ca648d3228..af03fba0552 100644 --- a/tests/plugins/memory/test_openviking_provider.py +++ b/tests/plugins/memory/test_openviking_provider.py @@ -7,6 +7,7 @@ from unittest.mock import MagicMock import pytest +import plugins.memory.openviking as openviking_module from plugins.memory.openviking import OpenVikingMemoryProvider, _VikingClient @@ -33,6 +34,27 @@ def _prompt_from_values(values: dict[str, str], *, forbidden: set[str] | None = return _prompt +def _allow_setup_validation(monkeypatch, *, root_access: bool = False): + monkeypatch.setattr( + openviking_module, + "_validate_openviking_reachability", + lambda endpoint: (True, ""), + raising=False, + ) + monkeypatch.setattr( + openviking_module, + "_validate_openviking_auth", + lambda values: (True, ""), + raising=False, + ) + monkeypatch.setattr( + openviking_module, + "_validate_openviking_root_access", + lambda values: (root_access, "" if root_access else "Requires role: root"), + raising=False, + ) + + @pytest.mark.skipif(os.name == "nt", reason="POSIX file modes") def test_openviking_env_writer_restricts_file_permissions(tmp_path): env_path = tmp_path / ".env" @@ -209,6 +231,7 @@ def test_post_setup_manual_remote_root_writes_ovcli_and_links(tmp_path, monkeypa ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8") monkeypatch.setenv("HERMES_HOME", str(hermes_home)) monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + _allow_setup_validation(monkeypatch, root_access=True) from hermes_cli import memory_setup @@ -241,7 +264,6 @@ def test_post_setup_manual_remote_root_writes_ovcli_and_links(tmp_path, monkeypa assert data == { "url": "https://openviking.example", "api_key": "root-secret", - "root_api_key": "root-secret", "account": "acct", "user": "alice", "agent_id": "agent", @@ -257,6 +279,7 @@ def test_post_setup_manual_remote_user_keeps_only_hermes_env(tmp_path, monkeypat ovcli_path.write_text(original_ovcli, encoding="utf-8") monkeypatch.setenv("HERMES_HOME", str(hermes_home)) monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + _allow_setup_validation(monkeypatch) from hermes_cli import memory_setup @@ -297,6 +320,294 @@ def test_post_setup_manual_remote_user_keeps_only_hermes_env(tmp_path, monkeypat assert "OPENVIKING_USER" not in env_text +def test_post_setup_manual_validation_failure_writes_nothing(tmp_path, monkeypatch): + _clear_openviking_env(monkeypatch) + hermes_home = tmp_path / "hermes" + hermes_home.mkdir() + ovcli_path = tmp_path / "ovcli.conf" + original_ovcli = json.dumps({"url": "http://old.local"}) + ovcli_path.write_text(original_ovcli, encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + _allow_setup_validation(monkeypatch) + monkeypatch.setattr( + openviking_module, + "_validate_openviking_auth", + lambda values: (False, "OpenViking authentication validation failed: bad key"), + raising=False, + ) + + from hermes_cli import config as hermes_config + from hermes_cli import memory_setup + + save_config = MagicMock() + choices = iter([2, 0, 1]) + monkeypatch.setattr(hermes_config, "save_config", save_config) + monkeypatch.setattr( + memory_setup, + "_curses_select", + lambda *args, **kwargs: next(choices), + ) + monkeypatch.setattr( + memory_setup, + "_prompt", + _prompt_from_values({ + "OpenViking server URL": "https://openviking.example", + "OpenViking user API key": "bad-key", + "OpenViking agent": "agent", + }), + ) + config = {"memory": {"provider": "builtin"}} + + OpenVikingMemoryProvider().post_setup(str(hermes_home), config) + + save_config.assert_not_called() + assert config == {"memory": {"provider": "builtin"}} + assert ovcli_path.read_text(encoding="utf-8") == original_ovcli + assert not (hermes_home / ".env").exists() + + +def test_post_setup_manual_retries_base_url_until_reachable(tmp_path, monkeypatch): + _clear_openviking_env(monkeypatch) + hermes_home = tmp_path / "hermes" + hermes_home.mkdir() + ovcli_path = tmp_path / "ovcli.conf" + ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, "")) + + reachability_calls = [] + + def validate_reachability(endpoint): + reachability_calls.append(endpoint) + if endpoint == "http://bad.local:1933": + return False, "OpenViking server is not reachable at http://bad.local:1933." + return True, "" + + monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", validate_reachability) + monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", lambda values: (False, "Requires role: root")) + + from hermes_cli import memory_setup + + prompts = { + "OpenViking server URL": iter(["http://bad.local:1933", "http://localhost:1933"]), + "OpenViking agent": iter(["agent"]), + } + + def fake_prompt(label, default=None, secret=False): + return next(prompts[label]) + + choices = iter([2, 0, 0, 1]) + monkeypatch.setattr( + memory_setup, + "_curses_select", + lambda *args, **kwargs: next(choices), + ) + monkeypatch.setattr(memory_setup, "_prompt", fake_prompt) + config = {"memory": {}} + + OpenVikingMemoryProvider().post_setup(str(hermes_home), config) + + assert reachability_calls == ["http://bad.local:1933", "http://localhost:1933"] + assert config["memory"]["provider"] == "openviking" + env_text = (hermes_home / ".env").read_text(encoding="utf-8") + assert "OPENVIKING_ENDPOINT=http://localhost:1933" in env_text + + +def test_post_setup_manual_retries_user_key_until_status_valid(tmp_path, monkeypatch): + _clear_openviking_env(monkeypatch) + hermes_home = tmp_path / "hermes" + hermes_home.mkdir() + ovcli_path = tmp_path / "ovcli.conf" + ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, "")) + monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", lambda values: (False, "Requires role: root")) + + auth_calls = [] + + def validate_auth(values): + auth_calls.append(dict(values)) + if values["api_key"] == "bad-key": + return False, "OpenViking authentication validation failed: bad key" + return True, "" + + monkeypatch.setattr(openviking_module, "_validate_openviking_auth", validate_auth) + + from hermes_cli import memory_setup + + prompts = { + "OpenViking server URL": iter(["https://openviking.example"]), + "OpenViking user API key": iter(["bad-key", "good-key"]), + "OpenViking agent": iter(["agent", "agent"]), + } + + def fake_prompt(label, default=None, secret=False): + return next(prompts[label]) + + choices = iter([2, 0, 0, 0, 1]) + monkeypatch.setattr( + memory_setup, + "_curses_select", + lambda *args, **kwargs: next(choices), + ) + monkeypatch.setattr(memory_setup, "_prompt", fake_prompt) + config = {"memory": {}} + + OpenVikingMemoryProvider().post_setup(str(hermes_home), config) + + assert [call["api_key"] for call in auth_calls] == ["bad-key", "good-key"] + env_text = (hermes_home / ".env").read_text(encoding="utf-8") + assert "OPENVIKING_API_KEY=good-key" in env_text + + +def test_post_setup_manual_user_key_rejects_root_key(tmp_path, monkeypatch): + _clear_openviking_env(monkeypatch) + hermes_home = tmp_path / "hermes" + hermes_home.mkdir() + ovcli_path = tmp_path / "ovcli.conf" + ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, "")) + monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, "")) + + root_checks = [] + + def validate_root(values): + root_checks.append(values["api_key"]) + if values["api_key"] == "root-secret": + return True, "" + return False, "Requires role: root" + + monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", validate_root) + + from hermes_cli import memory_setup + + prompts = { + "OpenViking server URL": iter(["https://openviking.example"]), + "OpenViking user API key": iter(["root-secret", "user-secret"]), + "OpenViking agent": iter(["agent", "agent"]), + } + + def fake_prompt(label, default=None, secret=False): + return next(prompts[label]) + + choices = iter([2, 0, 0, 0, 1]) + monkeypatch.setattr( + memory_setup, + "_curses_select", + lambda *args, **kwargs: next(choices), + ) + monkeypatch.setattr(memory_setup, "_prompt", fake_prompt) + config = {"memory": {}} + + OpenVikingMemoryProvider().post_setup(str(hermes_home), config) + + assert root_checks == ["root-secret", "user-secret"] + env_text = (hermes_home / ".env").read_text(encoding="utf-8") + assert "OPENVIKING_API_KEY=user-secret" in env_text + assert "OPENVIKING_API_KEY=root-secret" not in env_text + + +def test_post_setup_manual_root_key_requires_root_only_validation(tmp_path, monkeypatch): + _clear_openviking_env(monkeypatch) + hermes_home = tmp_path / "hermes" + hermes_home.mkdir() + ovcli_path = tmp_path / "ovcli.conf" + ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, "")) + monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, "")) + + root_calls = [] + + def validate_root(values): + root_calls.append(dict(values)) + return True, "" + + monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", validate_root) + + from hermes_cli import memory_setup + + monkeypatch.setattr( + memory_setup, + "_prompt", + _prompt_from_values({ + "OpenViking server URL": "https://openviking.example", + "OpenViking root API key": "root-secret", + "OpenViking account": "acct", + "OpenViking user": "alice", + "OpenViking agent": "agent", + }), + ) + choices = iter([2, 1, 1]) + monkeypatch.setattr( + memory_setup, + "_curses_select", + lambda *args, **kwargs: next(choices), + ) + config = {"memory": {}} + + OpenVikingMemoryProvider().post_setup(str(hermes_home), config) + + assert [call["api_key"] for call in root_calls] == ["root-secret"] + assert config["memory"]["provider"] == "openviking" + + +def test_post_setup_manual_retries_root_key_before_account_prompts(tmp_path, monkeypatch): + _clear_openviking_env(monkeypatch) + hermes_home = tmp_path / "hermes" + hermes_home.mkdir() + ovcli_path = tmp_path / "ovcli.conf" + ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8") + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, "")) + monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, "")) + + def validate_root(values): + if values["api_key"] == "bad-root": + return False, "OpenViking root API key validation failed: bad key" + return True, "" + + monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", validate_root) + + from hermes_cli import memory_setup + + prompt_events = [] + prompts = { + "OpenViking server URL": iter(["https://openviking.example"]), + "OpenViking root API key": iter(["bad-root", "good-root"]), + "OpenViking account": iter(["acct"]), + "OpenViking user": iter(["alice"]), + "OpenViking agent": iter(["agent"]), + } + + def fake_prompt(label, default=None, secret=False): + prompt_events.append(label) + return next(prompts[label]) + + choices = iter([2, 1, 0, 1, 1]) + monkeypatch.setattr( + memory_setup, + "_curses_select", + lambda *args, **kwargs: next(choices), + ) + monkeypatch.setattr(memory_setup, "_prompt", fake_prompt) + config = {"memory": {}} + + OpenVikingMemoryProvider().post_setup(str(hermes_home), config) + + assert prompt_events.index("OpenViking account") > prompt_events.index("OpenViking root API key") + assert prompt_events.count("OpenViking account") == 1 + env_text = (hermes_home / ".env").read_text(encoding="utf-8") + assert "OPENVIKING_API_KEY=good-root" in env_text + + def test_post_setup_manual_remote_requires_api_key(tmp_path, monkeypatch): _clear_openviking_env(monkeypatch) hermes_home = tmp_path / "hermes" @@ -312,7 +623,7 @@ def test_post_setup_manual_remote_requires_api_key(tmp_path, monkeypatch): save_config = MagicMock() monkeypatch.setattr(hermes_config, "save_config", save_config) - choices = iter([2, 0]) + choices = iter([2, 0, 1]) monkeypatch.setattr( memory_setup, "_curses_select", @@ -345,12 +656,13 @@ def test_post_setup_manual_root_requires_account_and_user(tmp_path, monkeypatch) ovcli_path.write_text(original_ovcli, encoding="utf-8") monkeypatch.setenv("HERMES_HOME", str(hermes_home)) monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + _allow_setup_validation(monkeypatch, root_access=True) from hermes_cli import config as hermes_config from hermes_cli import memory_setup save_config = MagicMock() - choices = iter([2, 1]) + choices = iter([2, 1, 1]) monkeypatch.setattr(hermes_config, "save_config", save_config) monkeypatch.setattr( memory_setup, @@ -386,6 +698,7 @@ def test_post_setup_manual_local_allows_blank_api_key(tmp_path, monkeypatch): ovcli_path.write_text(original_ovcli, encoding="utf-8") monkeypatch.setenv("HERMES_HOME", str(hermes_home)) monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path)) + _allow_setup_validation(monkeypatch) from hermes_cli import memory_setup @@ -956,3 +1269,144 @@ def test_viking_client_health_sends_auth_headers(monkeypatch): assert client.health() is True assert captured["url"] == "https://example.com/health" assert captured["headers"]["Authorization"] == "Bearer test-key" + + +def test_viking_client_validate_auth_uses_authenticated_system_status(monkeypatch): + client = _VikingClient( + "https://example.com", + api_key="test-key", + account="acct", + user="alice", + agent="hermes", + ) + captured = {} + + def capture_get(url, **kwargs): + captured["url"] = url + captured["headers"] = kwargs.get("headers") or {} + return SimpleNamespace( + status_code=200, + text="", + json=lambda: {"status": "ok", "result": {"initialized": True}}, + raise_for_status=lambda: None, + ) + + monkeypatch.setattr(client._httpx, "get", capture_get) + + assert client.validate_auth() == { + "status": "ok", + "result": {"initialized": True}, + } + assert captured["url"] == "https://example.com/api/v1/system/status" + assert captured["headers"]["Authorization"] == "Bearer test-key" + assert captured["headers"]["X-OpenViking-Account"] == "acct" + assert captured["headers"]["X-OpenViking-User"] == "alice" + + +def test_viking_client_validate_root_access_uses_admin_accounts(monkeypatch): + client = _VikingClient( + "https://example.com", + api_key="root-key", + account="", + user="", + agent="hermes", + ) + captured = {} + + def capture_get(url, **kwargs): + captured["url"] = url + captured["headers"] = kwargs.get("headers") or {} + return SimpleNamespace( + status_code=200, + text="", + json=lambda: {"status": "ok", "result": []}, + raise_for_status=lambda: None, + ) + + monkeypatch.setattr(client._httpx, "get", capture_get) + + assert client.validate_root_access() == {"status": "ok", "result": []} + assert captured["url"] == "https://example.com/api/v1/admin/accounts" + assert captured["headers"]["Authorization"] == "Bearer root-key" + assert "X-OpenViking-Account" not in captured["headers"] + assert "X-OpenViking-User" not in captured["headers"] + + +def test_validate_openviking_reachability_uses_health_only(monkeypatch): + events = [] + + class FakeVikingClient: + def __init__(self, endpoint, api_key="", account="", user="", agent=""): + assert endpoint == "https://openviking.example" + assert api_key == "" + + def health(self): + events.append("health") + return True + + monkeypatch.setattr(openviking_module, "_VikingClient", FakeVikingClient) + + ok, message = openviking_module._validate_openviking_reachability( + "https://openviking.example" + ) + + assert ok is True + assert message == "" + assert events == ["health"] + + +def test_validate_openviking_auth_uses_status_without_health(monkeypatch): + events = [] + + class FakeVikingClient: + def __init__(self, endpoint, api_key="", account="", user="", agent=""): + assert endpoint == "https://openviking.example" + assert api_key == "test-key" + assert account == "acct" + assert user == "alice" + assert agent == "hermes" + + def validate_auth(self): + events.append("status") + return {"status": "ok"} + + monkeypatch.setattr(openviking_module, "_VikingClient", FakeVikingClient) + + ok, message = openviking_module._validate_openviking_auth({ + "endpoint": "https://openviking.example", + "api_key": "test-key", + "account": "acct", + "user": "alice", + "agent": "hermes", + }) + + assert ok is True + assert message == "" + assert events == ["status"] + + +def test_validate_openviking_root_access_uses_admin_endpoint(monkeypatch): + events = [] + + class FakeVikingClient: + def __init__(self, endpoint, api_key="", account="", user="", agent=""): + assert endpoint == "https://openviking.example" + assert api_key == "root-key" + assert account == "" + assert user == "" + assert agent == "hermes" + + def validate_root_access(self): + events.append("admin") + return {"status": "ok"} + + monkeypatch.setattr(openviking_module, "_VikingClient", FakeVikingClient) + + ok, message = openviking_module._validate_openviking_root_access({ + "endpoint": "https://openviking.example", + "api_key": "root-key", + }) + + assert ok is True + assert message == "" + assert events == ["admin"]