fix(memory): validate OpenViking manual setup steps

This commit is contained in:
Hao Zhe 2026-05-26 14:30:45 +08:00
parent a893d77d8d
commit 2b972472ce
2 changed files with 671 additions and 74 deletions

View file

@ -223,6 +223,14 @@ class _VikingClient:
except Exception:
return False
def validate_auth(self) -> dict:
"""Validate authenticated OpenViking access without mutating state."""
return self.get("/api/v1/system/status")
def validate_root_access(self) -> dict:
"""Validate ROOT access against a read-only admin endpoint."""
return self.get("/api/v1/admin/accounts")
# ---------------------------------------------------------------------------
# Tool schemas
@ -564,17 +572,11 @@ def _remember_ovcli_path(provider_config: dict, ovcli_path: Path) -> None:
def _ovcli_data_from_connection_values(values: dict) -> dict:
data = {"url": _clean_config_value(values.get("endpoint")) or _DEFAULT_ENDPOINT}
api_key = _clean_config_value(values.get("api_key"))
api_key_type = _clean_config_value(values.get("api_key_type"))
root_api_key = _clean_config_value(values.get("root_api_key"))
account = _clean_config_value(values.get("account"))
user = _clean_config_value(values.get("user"))
agent = _clean_config_value(values.get("agent")) or _DEFAULT_AGENT
if api_key:
data["api_key"] = api_key
if root_api_key:
data["root_api_key"] = root_api_key
elif api_key and api_key_type == "root":
data["root_api_key"] = api_key
if account:
data["account"] = account
if user:
@ -590,76 +592,217 @@ def _write_ovcli_config(path: Path, values: dict) -> None:
_restrict_secret_file_permissions(path)
def _prompt_manual_connection_values(prompt, select, cancelled):
endpoint = _clean_config_value(
prompt("OpenViking server URL", default=_DEFAULT_ENDPOINT)
) or _DEFAULT_ENDPOINT
is_local = _is_local_openviking_url(endpoint)
def _validate_openviking_reachability(endpoint: str) -> tuple[bool, str]:
endpoint = _clean_config_value(endpoint) or _DEFAULT_ENDPOINT
try:
client = _VikingClient(endpoint)
if client.health():
return True, ""
except Exception as e:
return False, f"OpenViking server is not reachable at {endpoint}: {e}"
return False, f"OpenViking server is not reachable at {endpoint}."
values = {
"endpoint": endpoint,
"api_key": "",
"account": "",
"user": "",
"agent": "",
}
if is_local:
credential_choice = select(
" OpenViking credential",
[
("No API key", "local dev mode"),
("User API key", "server derives account/user automatically"),
("Root API key", "requires account and user IDs"),
],
default=0,
cancel_returns=cancelled,
)
if credential_choice == cancelled:
return _SETUP_CANCELLED
if credential_choice == 0:
values["agent"] = _clean_config_value(
prompt("OpenViking agent", default=_DEFAULT_AGENT)
) or _DEFAULT_AGENT
return values
api_key_type = "root" if credential_choice == 2 else "user"
else:
credential_choice = select(
" OpenViking API key type",
[
("User API key", "server derives account/user automatically"),
("Root API key", "requires account and user IDs"),
],
default=0,
cancel_returns=cancelled,
)
if credential_choice == cancelled:
return _SETUP_CANCELLED
api_key_type = "root" if credential_choice == 1 else "user"
values["api_key_type"] = api_key_type
api_key_label = (
"OpenViking root API key"
if api_key_type == "root"
else "OpenViking user API key"
def _validate_openviking_auth(values: dict) -> tuple[bool, str]:
endpoint = _clean_config_value(values.get("endpoint")) or _DEFAULT_ENDPOINT
try:
client = _VikingClient(
endpoint,
_clean_config_value(values.get("api_key")),
account=_clean_config_value(values.get("account")),
user=_clean_config_value(values.get("user")),
agent=_clean_config_value(values.get("agent")) or _DEFAULT_AGENT,
)
client.validate_auth()
except Exception as e:
return False, f"OpenViking authentication validation failed: {e}"
return True, ""
def _validate_openviking_root_access(values: dict) -> tuple[bool, str]:
endpoint = _clean_config_value(values.get("endpoint")) or _DEFAULT_ENDPOINT
try:
client = _VikingClient(
endpoint,
_clean_config_value(values.get("api_key")),
agent=_clean_config_value(values.get("agent")) or _DEFAULT_AGENT,
)
client.validate_root_access()
except Exception as e:
return False, f"OpenViking root API key validation failed: {e}"
return True, ""
def _validate_openviking_user_key_scope(values: dict) -> tuple[bool, str]:
root_ok, _message = _validate_openviking_root_access(values)
if not root_ok:
return True, ""
return (
False,
"That key has ROOT access. Choose Root API key and provide account/user, "
"or enter a user API key.",
)
values["api_key"] = _clean_config_value(prompt(api_key_label, secret=True))
if not values["api_key"]:
print(f"\n {api_key_label} is required.")
print(" No changes saved.\n")
return None
if api_key_type == "root":
values["account"] = _clean_config_value(prompt("OpenViking account"))
values["user"] = _clean_config_value(prompt("OpenViking user"))
if not values["account"] or not values["user"]:
print("\n Root API keys require both OpenViking account and user.")
print(" No changes saved.\n")
return None
values["agent"] = _clean_config_value(
prompt("OpenViking agent", default=_DEFAULT_AGENT)
) or _DEFAULT_AGENT
return values
def _retry_or_cancel_manual_setup(select, title: str, message: str, cancelled):
print(f" {message}")
choice = select(
title,
[
("Retry", "try this step again"),
("Cancel setup", "no changes saved"),
],
default=0,
cancel_returns=cancelled,
)
if choice == 0:
return True
return _SETUP_CANCELLED
def _prompt_manual_connection_values(prompt, select, cancelled):
while True:
endpoint = _clean_config_value(
prompt("OpenViking server URL", default=_DEFAULT_ENDPOINT)
) or _DEFAULT_ENDPOINT
reachable, message = _validate_openviking_reachability(endpoint)
if reachable:
print(" OpenViking server is reachable.")
break
retry = _retry_or_cancel_manual_setup(
select,
" OpenViking server unreachable",
message,
cancelled,
)
if retry is _SETUP_CANCELLED:
return _SETUP_CANCELLED
is_local = _is_local_openviking_url(endpoint)
while True:
values = {
"endpoint": endpoint,
"api_key": "",
"account": "",
"user": "",
"agent": "",
}
if is_local:
credential_choice = select(
" OpenViking credential",
[
("No API key", "local dev mode"),
("User API key", "server derives account/user automatically"),
("Root API key", "requires account and user IDs"),
],
default=0,
cancel_returns=cancelled,
)
if credential_choice == cancelled:
return _SETUP_CANCELLED
if credential_choice == 0:
values["agent"] = _clean_config_value(
prompt("OpenViking agent", default=_DEFAULT_AGENT)
) or _DEFAULT_AGENT
authenticated, message = _validate_openviking_auth(values)
if authenticated:
print(" OpenViking local dev access validated.")
return values
retry = _retry_or_cancel_manual_setup(
select,
" OpenViking credential failed",
message,
cancelled,
)
if retry is _SETUP_CANCELLED:
return _SETUP_CANCELLED
continue
api_key_type = "root" if credential_choice == 2 else "user"
else:
credential_choice = select(
" OpenViking API key type",
[
("User API key", "server derives account/user automatically"),
("Root API key", "requires account and user IDs"),
],
default=0,
cancel_returns=cancelled,
)
if credential_choice == cancelled:
return _SETUP_CANCELLED
api_key_type = "root" if credential_choice == 1 else "user"
values["api_key_type"] = api_key_type
api_key_label = (
"OpenViking root API key"
if api_key_type == "root"
else "OpenViking user API key"
)
values["api_key"] = _clean_config_value(prompt(api_key_label, secret=True))
if not values["api_key"]:
retry = _retry_or_cancel_manual_setup(
select,
" OpenViking API key required",
f"{api_key_label} is required.",
cancelled,
)
if retry is _SETUP_CANCELLED:
return _SETUP_CANCELLED
continue
if api_key_type == "root":
root_ok, message = _validate_openviking_root_access(values)
if not root_ok:
retry = _retry_or_cancel_manual_setup(
select,
" OpenViking root API key failed",
message,
cancelled,
)
if retry is _SETUP_CANCELLED:
return _SETUP_CANCELLED
continue
print(" OpenViking root API key validated.")
values["account"] = _clean_config_value(prompt("OpenViking account"))
values["user"] = _clean_config_value(prompt("OpenViking user"))
if not values["account"] or not values["user"]:
retry = _retry_or_cancel_manual_setup(
select,
" OpenViking tenant identity required",
"Root API keys require both OpenViking account and user.",
cancelled,
)
if retry is _SETUP_CANCELLED:
return _SETUP_CANCELLED
continue
values["agent"] = _clean_config_value(
prompt("OpenViking agent", default=_DEFAULT_AGENT)
) or _DEFAULT_AGENT
authenticated, message = _validate_openviking_auth(values)
if authenticated:
if api_key_type == "user":
user_key_ok, message = _validate_openviking_user_key_scope(values)
if not user_key_ok:
retry = _retry_or_cancel_manual_setup(
select,
" OpenViking user API key is root key",
message,
cancelled,
)
if retry is _SETUP_CANCELLED:
return _SETUP_CANCELLED
continue
print(" OpenViking API access validated.")
return values
retry = _retry_or_cancel_manual_setup(
select,
" OpenViking API access failed",
message,
cancelled,
)
if retry is _SETUP_CANCELLED:
return _SETUP_CANCELLED
# ---------------------------------------------------------------------------

View file

@ -7,6 +7,7 @@ from unittest.mock import MagicMock
import pytest
import plugins.memory.openviking as openviking_module
from plugins.memory.openviking import OpenVikingMemoryProvider, _VikingClient
@ -33,6 +34,27 @@ def _prompt_from_values(values: dict[str, str], *, forbidden: set[str] | None =
return _prompt
def _allow_setup_validation(monkeypatch, *, root_access: bool = False):
monkeypatch.setattr(
openviking_module,
"_validate_openviking_reachability",
lambda endpoint: (True, ""),
raising=False,
)
monkeypatch.setattr(
openviking_module,
"_validate_openviking_auth",
lambda values: (True, ""),
raising=False,
)
monkeypatch.setattr(
openviking_module,
"_validate_openviking_root_access",
lambda values: (root_access, "" if root_access else "Requires role: root"),
raising=False,
)
@pytest.mark.skipif(os.name == "nt", reason="POSIX file modes")
def test_openviking_env_writer_restricts_file_permissions(tmp_path):
env_path = tmp_path / ".env"
@ -209,6 +231,7 @@ def test_post_setup_manual_remote_root_writes_ovcli_and_links(tmp_path, monkeypa
ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
_allow_setup_validation(monkeypatch, root_access=True)
from hermes_cli import memory_setup
@ -241,7 +264,6 @@ def test_post_setup_manual_remote_root_writes_ovcli_and_links(tmp_path, monkeypa
assert data == {
"url": "https://openviking.example",
"api_key": "root-secret",
"root_api_key": "root-secret",
"account": "acct",
"user": "alice",
"agent_id": "agent",
@ -257,6 +279,7 @@ def test_post_setup_manual_remote_user_keeps_only_hermes_env(tmp_path, monkeypat
ovcli_path.write_text(original_ovcli, encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
_allow_setup_validation(monkeypatch)
from hermes_cli import memory_setup
@ -297,6 +320,294 @@ def test_post_setup_manual_remote_user_keeps_only_hermes_env(tmp_path, monkeypat
assert "OPENVIKING_USER" not in env_text
def test_post_setup_manual_validation_failure_writes_nothing(tmp_path, monkeypatch):
_clear_openviking_env(monkeypatch)
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
ovcli_path = tmp_path / "ovcli.conf"
original_ovcli = json.dumps({"url": "http://old.local"})
ovcli_path.write_text(original_ovcli, encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
_allow_setup_validation(monkeypatch)
monkeypatch.setattr(
openviking_module,
"_validate_openviking_auth",
lambda values: (False, "OpenViking authentication validation failed: bad key"),
raising=False,
)
from hermes_cli import config as hermes_config
from hermes_cli import memory_setup
save_config = MagicMock()
choices = iter([2, 0, 1])
monkeypatch.setattr(hermes_config, "save_config", save_config)
monkeypatch.setattr(
memory_setup,
"_curses_select",
lambda *args, **kwargs: next(choices),
)
monkeypatch.setattr(
memory_setup,
"_prompt",
_prompt_from_values({
"OpenViking server URL": "https://openviking.example",
"OpenViking user API key": "bad-key",
"OpenViking agent": "agent",
}),
)
config = {"memory": {"provider": "builtin"}}
OpenVikingMemoryProvider().post_setup(str(hermes_home), config)
save_config.assert_not_called()
assert config == {"memory": {"provider": "builtin"}}
assert ovcli_path.read_text(encoding="utf-8") == original_ovcli
assert not (hermes_home / ".env").exists()
def test_post_setup_manual_retries_base_url_until_reachable(tmp_path, monkeypatch):
_clear_openviking_env(monkeypatch)
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
ovcli_path = tmp_path / "ovcli.conf"
ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, ""))
reachability_calls = []
def validate_reachability(endpoint):
reachability_calls.append(endpoint)
if endpoint == "http://bad.local:1933":
return False, "OpenViking server is not reachable at http://bad.local:1933."
return True, ""
monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", validate_reachability)
monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", lambda values: (False, "Requires role: root"))
from hermes_cli import memory_setup
prompts = {
"OpenViking server URL": iter(["http://bad.local:1933", "http://localhost:1933"]),
"OpenViking agent": iter(["agent"]),
}
def fake_prompt(label, default=None, secret=False):
return next(prompts[label])
choices = iter([2, 0, 0, 1])
monkeypatch.setattr(
memory_setup,
"_curses_select",
lambda *args, **kwargs: next(choices),
)
monkeypatch.setattr(memory_setup, "_prompt", fake_prompt)
config = {"memory": {}}
OpenVikingMemoryProvider().post_setup(str(hermes_home), config)
assert reachability_calls == ["http://bad.local:1933", "http://localhost:1933"]
assert config["memory"]["provider"] == "openviking"
env_text = (hermes_home / ".env").read_text(encoding="utf-8")
assert "OPENVIKING_ENDPOINT=http://localhost:1933" in env_text
def test_post_setup_manual_retries_user_key_until_status_valid(tmp_path, monkeypatch):
_clear_openviking_env(monkeypatch)
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
ovcli_path = tmp_path / "ovcli.conf"
ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, ""))
monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", lambda values: (False, "Requires role: root"))
auth_calls = []
def validate_auth(values):
auth_calls.append(dict(values))
if values["api_key"] == "bad-key":
return False, "OpenViking authentication validation failed: bad key"
return True, ""
monkeypatch.setattr(openviking_module, "_validate_openviking_auth", validate_auth)
from hermes_cli import memory_setup
prompts = {
"OpenViking server URL": iter(["https://openviking.example"]),
"OpenViking user API key": iter(["bad-key", "good-key"]),
"OpenViking agent": iter(["agent", "agent"]),
}
def fake_prompt(label, default=None, secret=False):
return next(prompts[label])
choices = iter([2, 0, 0, 0, 1])
monkeypatch.setattr(
memory_setup,
"_curses_select",
lambda *args, **kwargs: next(choices),
)
monkeypatch.setattr(memory_setup, "_prompt", fake_prompt)
config = {"memory": {}}
OpenVikingMemoryProvider().post_setup(str(hermes_home), config)
assert [call["api_key"] for call in auth_calls] == ["bad-key", "good-key"]
env_text = (hermes_home / ".env").read_text(encoding="utf-8")
assert "OPENVIKING_API_KEY=good-key" in env_text
def test_post_setup_manual_user_key_rejects_root_key(tmp_path, monkeypatch):
_clear_openviking_env(monkeypatch)
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
ovcli_path = tmp_path / "ovcli.conf"
ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, ""))
monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, ""))
root_checks = []
def validate_root(values):
root_checks.append(values["api_key"])
if values["api_key"] == "root-secret":
return True, ""
return False, "Requires role: root"
monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", validate_root)
from hermes_cli import memory_setup
prompts = {
"OpenViking server URL": iter(["https://openviking.example"]),
"OpenViking user API key": iter(["root-secret", "user-secret"]),
"OpenViking agent": iter(["agent", "agent"]),
}
def fake_prompt(label, default=None, secret=False):
return next(prompts[label])
choices = iter([2, 0, 0, 0, 1])
monkeypatch.setattr(
memory_setup,
"_curses_select",
lambda *args, **kwargs: next(choices),
)
monkeypatch.setattr(memory_setup, "_prompt", fake_prompt)
config = {"memory": {}}
OpenVikingMemoryProvider().post_setup(str(hermes_home), config)
assert root_checks == ["root-secret", "user-secret"]
env_text = (hermes_home / ".env").read_text(encoding="utf-8")
assert "OPENVIKING_API_KEY=user-secret" in env_text
assert "OPENVIKING_API_KEY=root-secret" not in env_text
def test_post_setup_manual_root_key_requires_root_only_validation(tmp_path, monkeypatch):
_clear_openviking_env(monkeypatch)
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
ovcli_path = tmp_path / "ovcli.conf"
ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, ""))
monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, ""))
root_calls = []
def validate_root(values):
root_calls.append(dict(values))
return True, ""
monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", validate_root)
from hermes_cli import memory_setup
monkeypatch.setattr(
memory_setup,
"_prompt",
_prompt_from_values({
"OpenViking server URL": "https://openviking.example",
"OpenViking root API key": "root-secret",
"OpenViking account": "acct",
"OpenViking user": "alice",
"OpenViking agent": "agent",
}),
)
choices = iter([2, 1, 1])
monkeypatch.setattr(
memory_setup,
"_curses_select",
lambda *args, **kwargs: next(choices),
)
config = {"memory": {}}
OpenVikingMemoryProvider().post_setup(str(hermes_home), config)
assert [call["api_key"] for call in root_calls] == ["root-secret"]
assert config["memory"]["provider"] == "openviking"
def test_post_setup_manual_retries_root_key_before_account_prompts(tmp_path, monkeypatch):
_clear_openviking_env(monkeypatch)
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
ovcli_path = tmp_path / "ovcli.conf"
ovcli_path.write_text(json.dumps({"url": "http://old.local"}), encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
monkeypatch.setattr(openviking_module, "_validate_openviking_reachability", lambda endpoint: (True, ""))
monkeypatch.setattr(openviking_module, "_validate_openviking_auth", lambda values: (True, ""))
def validate_root(values):
if values["api_key"] == "bad-root":
return False, "OpenViking root API key validation failed: bad key"
return True, ""
monkeypatch.setattr(openviking_module, "_validate_openviking_root_access", validate_root)
from hermes_cli import memory_setup
prompt_events = []
prompts = {
"OpenViking server URL": iter(["https://openviking.example"]),
"OpenViking root API key": iter(["bad-root", "good-root"]),
"OpenViking account": iter(["acct"]),
"OpenViking user": iter(["alice"]),
"OpenViking agent": iter(["agent"]),
}
def fake_prompt(label, default=None, secret=False):
prompt_events.append(label)
return next(prompts[label])
choices = iter([2, 1, 0, 1, 1])
monkeypatch.setattr(
memory_setup,
"_curses_select",
lambda *args, **kwargs: next(choices),
)
monkeypatch.setattr(memory_setup, "_prompt", fake_prompt)
config = {"memory": {}}
OpenVikingMemoryProvider().post_setup(str(hermes_home), config)
assert prompt_events.index("OpenViking account") > prompt_events.index("OpenViking root API key")
assert prompt_events.count("OpenViking account") == 1
env_text = (hermes_home / ".env").read_text(encoding="utf-8")
assert "OPENVIKING_API_KEY=good-root" in env_text
def test_post_setup_manual_remote_requires_api_key(tmp_path, monkeypatch):
_clear_openviking_env(monkeypatch)
hermes_home = tmp_path / "hermes"
@ -312,7 +623,7 @@ def test_post_setup_manual_remote_requires_api_key(tmp_path, monkeypatch):
save_config = MagicMock()
monkeypatch.setattr(hermes_config, "save_config", save_config)
choices = iter([2, 0])
choices = iter([2, 0, 1])
monkeypatch.setattr(
memory_setup,
"_curses_select",
@ -345,12 +656,13 @@ def test_post_setup_manual_root_requires_account_and_user(tmp_path, monkeypatch)
ovcli_path.write_text(original_ovcli, encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
_allow_setup_validation(monkeypatch, root_access=True)
from hermes_cli import config as hermes_config
from hermes_cli import memory_setup
save_config = MagicMock()
choices = iter([2, 1])
choices = iter([2, 1, 1])
monkeypatch.setattr(hermes_config, "save_config", save_config)
monkeypatch.setattr(
memory_setup,
@ -386,6 +698,7 @@ def test_post_setup_manual_local_allows_blank_api_key(tmp_path, monkeypatch):
ovcli_path.write_text(original_ovcli, encoding="utf-8")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("OPENVIKING_CLI_CONFIG_FILE", str(ovcli_path))
_allow_setup_validation(monkeypatch)
from hermes_cli import memory_setup
@ -956,3 +1269,144 @@ def test_viking_client_health_sends_auth_headers(monkeypatch):
assert client.health() is True
assert captured["url"] == "https://example.com/health"
assert captured["headers"]["Authorization"] == "Bearer test-key"
def test_viking_client_validate_auth_uses_authenticated_system_status(monkeypatch):
client = _VikingClient(
"https://example.com",
api_key="test-key",
account="acct",
user="alice",
agent="hermes",
)
captured = {}
def capture_get(url, **kwargs):
captured["url"] = url
captured["headers"] = kwargs.get("headers") or {}
return SimpleNamespace(
status_code=200,
text="",
json=lambda: {"status": "ok", "result": {"initialized": True}},
raise_for_status=lambda: None,
)
monkeypatch.setattr(client._httpx, "get", capture_get)
assert client.validate_auth() == {
"status": "ok",
"result": {"initialized": True},
}
assert captured["url"] == "https://example.com/api/v1/system/status"
assert captured["headers"]["Authorization"] == "Bearer test-key"
assert captured["headers"]["X-OpenViking-Account"] == "acct"
assert captured["headers"]["X-OpenViking-User"] == "alice"
def test_viking_client_validate_root_access_uses_admin_accounts(monkeypatch):
client = _VikingClient(
"https://example.com",
api_key="root-key",
account="",
user="",
agent="hermes",
)
captured = {}
def capture_get(url, **kwargs):
captured["url"] = url
captured["headers"] = kwargs.get("headers") or {}
return SimpleNamespace(
status_code=200,
text="",
json=lambda: {"status": "ok", "result": []},
raise_for_status=lambda: None,
)
monkeypatch.setattr(client._httpx, "get", capture_get)
assert client.validate_root_access() == {"status": "ok", "result": []}
assert captured["url"] == "https://example.com/api/v1/admin/accounts"
assert captured["headers"]["Authorization"] == "Bearer root-key"
assert "X-OpenViking-Account" not in captured["headers"]
assert "X-OpenViking-User" not in captured["headers"]
def test_validate_openviking_reachability_uses_health_only(monkeypatch):
events = []
class FakeVikingClient:
def __init__(self, endpoint, api_key="", account="", user="", agent=""):
assert endpoint == "https://openviking.example"
assert api_key == ""
def health(self):
events.append("health")
return True
monkeypatch.setattr(openviking_module, "_VikingClient", FakeVikingClient)
ok, message = openviking_module._validate_openviking_reachability(
"https://openviking.example"
)
assert ok is True
assert message == ""
assert events == ["health"]
def test_validate_openviking_auth_uses_status_without_health(monkeypatch):
events = []
class FakeVikingClient:
def __init__(self, endpoint, api_key="", account="", user="", agent=""):
assert endpoint == "https://openviking.example"
assert api_key == "test-key"
assert account == "acct"
assert user == "alice"
assert agent == "hermes"
def validate_auth(self):
events.append("status")
return {"status": "ok"}
monkeypatch.setattr(openviking_module, "_VikingClient", FakeVikingClient)
ok, message = openviking_module._validate_openviking_auth({
"endpoint": "https://openviking.example",
"api_key": "test-key",
"account": "acct",
"user": "alice",
"agent": "hermes",
})
assert ok is True
assert message == ""
assert events == ["status"]
def test_validate_openviking_root_access_uses_admin_endpoint(monkeypatch):
events = []
class FakeVikingClient:
def __init__(self, endpoint, api_key="", account="", user="", agent=""):
assert endpoint == "https://openviking.example"
assert api_key == "root-key"
assert account == ""
assert user == ""
assert agent == "hermes"
def validate_root_access(self):
events.append("admin")
return {"status": "ok"}
monkeypatch.setattr(openviking_module, "_VikingClient", FakeVikingClient)
ok, message = openviking_module._validate_openviking_root_access({
"endpoint": "https://openviking.example",
"api_key": "root-key",
})
assert ok is True
assert message == ""
assert events == ["admin"]