mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 02:11:48 +00:00
Merge branch 'main' of github.com:NousResearch/hermes-agent into feat/ink-refactor
This commit is contained in:
commit
bf54f1fb2f
83 changed files with 5435 additions and 470 deletions
271
tests/hermes_cli/test_completion.py
Normal file
271
tests/hermes_cli/test_completion.py
Normal file
|
|
@ -0,0 +1,271 @@
|
|||
"""Tests for hermes_cli/completion.py — shell completion script generation."""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_cli.completion import _walk, generate_bash, generate_zsh, generate_fish
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_parser() -> argparse.ArgumentParser:
|
||||
"""Build a minimal parser that mirrors the real hermes structure."""
|
||||
p = argparse.ArgumentParser(prog="hermes")
|
||||
p.add_argument("--version", "-V", action="store_true")
|
||||
p.add_argument("-p", "--profile", help="Profile name")
|
||||
sub = p.add_subparsers(dest="command")
|
||||
|
||||
chat = sub.add_parser("chat", help="Interactive chat with the agent")
|
||||
chat.add_argument("-q", "--query")
|
||||
chat.add_argument("-m", "--model")
|
||||
|
||||
gw = sub.add_parser("gateway", help="Messaging gateway management")
|
||||
gw_sub = gw.add_subparsers(dest="gateway_command")
|
||||
gw_sub.add_parser("start", help="Start service")
|
||||
gw_sub.add_parser("stop", help="Stop service")
|
||||
gw_sub.add_parser("status", help="Show status")
|
||||
# alias — should NOT appear as a duplicate in completions
|
||||
gw_sub.add_parser("run", aliases=["foreground"], help="Run in foreground")
|
||||
|
||||
sess = sub.add_parser("sessions", help="Manage session history")
|
||||
sess_sub = sess.add_subparsers(dest="sessions_action")
|
||||
sess_sub.add_parser("list", help="List sessions")
|
||||
sess_sub.add_parser("delete", help="Delete a session")
|
||||
|
||||
prof = sub.add_parser("profile", help="Manage profiles")
|
||||
prof_sub = prof.add_subparsers(dest="profile_command")
|
||||
prof_sub.add_parser("list", help="List profiles")
|
||||
prof_sub.add_parser("use", help="Switch to a profile")
|
||||
prof_sub.add_parser("create", help="Create a new profile")
|
||||
prof_sub.add_parser("delete", help="Delete a profile")
|
||||
prof_sub.add_parser("show", help="Show profile details")
|
||||
prof_sub.add_parser("alias", help="Set profile alias")
|
||||
prof_sub.add_parser("rename", help="Rename a profile")
|
||||
prof_sub.add_parser("export", help="Export a profile")
|
||||
|
||||
sub.add_parser("version", help="Show version")
|
||||
|
||||
return p
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 1. Parser extraction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWalk:
|
||||
def test_top_level_subcommands_extracted(self):
|
||||
tree = _walk(_make_parser())
|
||||
assert set(tree["subcommands"].keys()) == {"chat", "gateway", "sessions", "profile", "version"}
|
||||
|
||||
def test_nested_subcommands_extracted(self):
|
||||
tree = _walk(_make_parser())
|
||||
gw_subs = set(tree["subcommands"]["gateway"]["subcommands"].keys())
|
||||
assert {"start", "stop", "status", "run"}.issubset(gw_subs)
|
||||
|
||||
def test_aliases_not_duplicated(self):
|
||||
"""'foreground' is an alias of 'run' — must not appear as separate entry."""
|
||||
tree = _walk(_make_parser())
|
||||
gw_subs = tree["subcommands"]["gateway"]["subcommands"]
|
||||
assert "foreground" not in gw_subs
|
||||
|
||||
def test_flags_extracted(self):
|
||||
tree = _walk(_make_parser())
|
||||
chat_flags = tree["subcommands"]["chat"]["flags"]
|
||||
assert "-q" in chat_flags or "--query" in chat_flags
|
||||
|
||||
def test_help_text_captured(self):
|
||||
tree = _walk(_make_parser())
|
||||
assert tree["subcommands"]["chat"]["help"] != ""
|
||||
assert tree["subcommands"]["gateway"]["help"] != ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 2. Bash output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGenerateBash:
|
||||
def test_contains_completion_function_and_register(self):
|
||||
out = generate_bash(_make_parser())
|
||||
assert "_hermes_completion()" in out
|
||||
assert "complete -F _hermes_completion hermes" in out
|
||||
|
||||
def test_top_level_commands_present(self):
|
||||
out = generate_bash(_make_parser())
|
||||
for cmd in ("chat", "gateway", "sessions", "version"):
|
||||
assert cmd in out
|
||||
|
||||
def test_nested_subcommands_in_case(self):
|
||||
out = generate_bash(_make_parser())
|
||||
assert "start" in out
|
||||
assert "stop" in out
|
||||
|
||||
def test_valid_bash_syntax(self):
|
||||
"""Script must pass `bash -n` syntax check."""
|
||||
out = generate_bash(_make_parser())
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".bash", delete=False) as f:
|
||||
f.write(out)
|
||||
path = f.name
|
||||
try:
|
||||
result = subprocess.run(["bash", "-n", path], capture_output=True)
|
||||
assert result.returncode == 0, result.stderr.decode()
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 3. Zsh output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGenerateZsh:
|
||||
def test_contains_compdef_header(self):
|
||||
out = generate_zsh(_make_parser())
|
||||
assert "#compdef hermes" in out
|
||||
|
||||
def test_top_level_commands_present(self):
|
||||
out = generate_zsh(_make_parser())
|
||||
for cmd in ("chat", "gateway", "sessions", "version"):
|
||||
assert cmd in out
|
||||
|
||||
def test_nested_describe_blocks(self):
|
||||
out = generate_zsh(_make_parser())
|
||||
assert "_describe" in out
|
||||
# gateway has subcommands so a _cmds array must be generated
|
||||
assert "gateway_cmds" in out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 4. Fish output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGenerateFish:
|
||||
def test_disables_file_completion(self):
|
||||
out = generate_fish(_make_parser())
|
||||
assert "complete -c hermes -f" in out
|
||||
|
||||
def test_top_level_commands_present(self):
|
||||
out = generate_fish(_make_parser())
|
||||
for cmd in ("chat", "gateway", "sessions", "version"):
|
||||
assert cmd in out
|
||||
|
||||
def test_subcommand_guard_present(self):
|
||||
out = generate_fish(_make_parser())
|
||||
assert "__fish_seen_subcommand_from" in out
|
||||
|
||||
def test_valid_fish_syntax(self):
|
||||
"""Script must be accepted by fish without errors."""
|
||||
if not shutil.which("fish"):
|
||||
pytest.skip("fish not installed")
|
||||
out = generate_fish(_make_parser())
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".fish", delete=False) as f:
|
||||
f.write(out)
|
||||
path = f.name
|
||||
try:
|
||||
result = subprocess.run(["fish", path], capture_output=True)
|
||||
assert result.returncode == 0, result.stderr.decode()
|
||||
finally:
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 5. Subcommand drift prevention
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSubcommandDrift:
|
||||
def test_SUBCOMMANDS_covers_required_commands(self):
|
||||
"""_SUBCOMMANDS must include all known top-level commands so that
|
||||
multi-word session names after -c/-r are never accidentally split.
|
||||
"""
|
||||
import inspect
|
||||
from hermes_cli.main import _coalesce_session_name_args
|
||||
|
||||
source = inspect.getsource(_coalesce_session_name_args)
|
||||
match = re.search(r'_SUBCOMMANDS\s*=\s*\{([^}]+)\}', source, re.DOTALL)
|
||||
assert match, "_SUBCOMMANDS block not found in _coalesce_session_name_args()"
|
||||
defined = set(re.findall(r'"(\w+)"', match.group(1)))
|
||||
|
||||
required = {
|
||||
"chat", "model", "gateway", "setup", "login", "logout", "auth",
|
||||
"status", "cron", "config", "sessions", "version", "update",
|
||||
"uninstall", "profile", "skills", "tools", "mcp", "plugins",
|
||||
"acp", "claw", "honcho", "completion", "logs",
|
||||
}
|
||||
missing = required - defined
|
||||
assert not missing, f"Missing from _SUBCOMMANDS: {missing}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 6. Profile completion (regression prevention)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestProfileCompletion:
|
||||
"""Ensure profile name completion is present in all shell outputs."""
|
||||
|
||||
def test_bash_has_profiles_helper(self):
|
||||
out = generate_bash(_make_parser())
|
||||
assert "_hermes_profiles()" in out
|
||||
assert 'profiles_dir="$HOME/.hermes/profiles"' in out
|
||||
|
||||
def test_bash_completes_profiles_after_p_flag(self):
|
||||
out = generate_bash(_make_parser())
|
||||
assert '"-p"' in out or "== \"-p\"" in out
|
||||
assert '"--profile"' in out or '== "--profile"' in out
|
||||
assert "_hermes_profiles" in out
|
||||
|
||||
def test_bash_profile_subcommand_has_action_completion(self):
|
||||
out = generate_bash(_make_parser())
|
||||
assert "use|delete|show|alias|rename|export)" in out
|
||||
|
||||
def test_bash_profile_actions_complete_profile_names(self):
|
||||
"""After 'hermes profile use', complete with profile names."""
|
||||
out = generate_bash(_make_parser())
|
||||
# The profile case should have _hermes_profiles for name-taking actions
|
||||
lines = out.split("\n")
|
||||
in_profile_case = False
|
||||
has_profiles_in_action = False
|
||||
for line in lines:
|
||||
if "profile)" in line:
|
||||
in_profile_case = True
|
||||
if in_profile_case and "_hermes_profiles" in line:
|
||||
has_profiles_in_action = True
|
||||
break
|
||||
assert has_profiles_in_action, "profile actions should complete with _hermes_profiles"
|
||||
|
||||
def test_zsh_has_profiles_helper(self):
|
||||
out = generate_zsh(_make_parser())
|
||||
assert "_hermes_profiles()" in out
|
||||
assert "$HOME/.hermes/profiles" in out
|
||||
|
||||
def test_zsh_has_profile_flag_completion(self):
|
||||
out = generate_zsh(_make_parser())
|
||||
assert "--profile" in out
|
||||
assert "_hermes_profiles" in out
|
||||
|
||||
def test_zsh_profile_actions_complete_names(self):
|
||||
out = generate_zsh(_make_parser())
|
||||
assert "use|delete|show|alias|rename|export)" in out
|
||||
|
||||
def test_fish_has_profiles_helper(self):
|
||||
out = generate_fish(_make_parser())
|
||||
assert "__hermes_profiles" in out
|
||||
assert "$HOME/.hermes/profiles" in out
|
||||
|
||||
def test_fish_has_profile_flag_completion(self):
|
||||
out = generate_fish(_make_parser())
|
||||
assert "-s p -l profile" in out
|
||||
assert "(__hermes_profiles)" in out
|
||||
|
||||
def test_fish_profile_actions_complete_names(self):
|
||||
out = generate_fish(_make_parser())
|
||||
# Should have profile name completion for actions like use, delete, etc.
|
||||
assert "__hermes_profiles" in out
|
||||
count = out.count("(__hermes_profiles)")
|
||||
# At least the -p flag + the profile action completions
|
||||
assert count >= 2, f"Expected >=2 profile completion entries, got {count}"
|
||||
|
|
@ -40,6 +40,10 @@ class TestProviderEnvDetection:
|
|||
content = "OPENAI_BASE_URL=http://localhost:8080/v1\n"
|
||||
assert _has_provider_env_config(content)
|
||||
|
||||
def test_detects_kimi_cn_api_key(self):
|
||||
content = "KIMI_CN_API_KEY=sk-test\n"
|
||||
assert _has_provider_env_config(content)
|
||||
|
||||
def test_returns_false_when_no_provider_settings(self):
|
||||
content = "TERMINAL_ENV=local\n"
|
||||
assert not _has_provider_env_config(content)
|
||||
|
|
@ -292,3 +296,50 @@ def test_run_doctor_termux_does_not_mark_browser_available_without_agent_browser
|
|||
assert "system dependency not met" in out
|
||||
assert "agent-browser is not installed (expected in the tested Termux path)" in out
|
||||
assert "npm install -g agent-browser && agent-browser install" in out
|
||||
|
||||
|
||||
def test_run_doctor_kimi_cn_env_is_detected_and_probe_is_null_safe(monkeypatch, tmp_path):
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir(parents=True, exist_ok=True)
|
||||
(home / "config.yaml").write_text("memory: {}\n", encoding="utf-8")
|
||||
(home / ".env").write_text("KIMI_CN_API_KEY=sk-test\n", encoding="utf-8")
|
||||
project = tmp_path / "project"
|
||||
project.mkdir(exist_ok=True)
|
||||
|
||||
monkeypatch.setattr(doctor_mod, "HERMES_HOME", home)
|
||||
monkeypatch.setattr(doctor_mod, "PROJECT_ROOT", project)
|
||||
monkeypatch.setattr(doctor_mod, "_DHH", str(home))
|
||||
monkeypatch.setenv("KIMI_CN_API_KEY", "sk-test")
|
||||
|
||||
fake_model_tools = types.SimpleNamespace(
|
||||
check_tool_availability=lambda *a, **kw: ([], []),
|
||||
TOOLSET_REQUIREMENTS={},
|
||||
)
|
||||
monkeypatch.setitem(sys.modules, "model_tools", fake_model_tools)
|
||||
|
||||
try:
|
||||
from hermes_cli import auth as _auth_mod
|
||||
monkeypatch.setattr(_auth_mod, "get_nous_auth_status", lambda: {})
|
||||
monkeypatch.setattr(_auth_mod, "get_codex_auth_status", lambda: {})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
calls = []
|
||||
|
||||
def fake_get(url, headers=None, timeout=None):
|
||||
calls.append((url, headers, timeout))
|
||||
return types.SimpleNamespace(status_code=200)
|
||||
|
||||
import httpx
|
||||
monkeypatch.setattr(httpx, "get", fake_get)
|
||||
|
||||
import io, contextlib
|
||||
buf = io.StringIO()
|
||||
with contextlib.redirect_stdout(buf):
|
||||
doctor_mod.run_doctor(Namespace(fix=False))
|
||||
out = buf.getvalue()
|
||||
|
||||
assert "API key or custom endpoint configured" in out
|
||||
assert "Kimi / Moonshot (China)" in out
|
||||
assert "str expected, not NoneType" not in out
|
||||
assert any(url == "https://api.moonshot.cn/v1/models" for url, _, _ in calls)
|
||||
|
|
|
|||
|
|
@ -108,8 +108,9 @@ class TestWebServerEndpoints:
|
|||
except ImportError:
|
||||
pytest.skip("fastapi/starlette not installed")
|
||||
|
||||
from hermes_cli.web_server import app
|
||||
from hermes_cli.web_server import app, _SESSION_TOKEN
|
||||
self.client = TestClient(app)
|
||||
self.client.headers["Authorization"] = f"Bearer {_SESSION_TOKEN}"
|
||||
|
||||
def test_get_status(self):
|
||||
resp = self.client.get("/api/status")
|
||||
|
|
@ -239,9 +240,13 @@ class TestWebServerEndpoints:
|
|||
|
||||
def test_reveal_env_var_no_token(self, tmp_path):
|
||||
"""POST /api/env/reveal without token should return 401."""
|
||||
from starlette.testclient import TestClient
|
||||
from hermes_cli.web_server import app
|
||||
from hermes_cli.config import save_env_value
|
||||
save_env_value("TEST_REVEAL_NOAUTH", "secret-value")
|
||||
resp = self.client.post(
|
||||
# Use a fresh client WITHOUT the Authorization header
|
||||
unauth_client = TestClient(app)
|
||||
resp = unauth_client.post(
|
||||
"/api/env/reveal",
|
||||
json={"key": "TEST_REVEAL_NOAUTH"},
|
||||
)
|
||||
|
|
@ -258,12 +263,32 @@ class TestWebServerEndpoints:
|
|||
)
|
||||
assert resp.status_code == 401
|
||||
|
||||
def test_session_token_endpoint(self):
|
||||
"""GET /api/auth/session-token should return a token."""
|
||||
from hermes_cli.web_server import _SESSION_TOKEN
|
||||
def test_session_token_endpoint_removed(self):
|
||||
"""GET /api/auth/session-token should no longer exist (token injected via HTML)."""
|
||||
resp = self.client.get("/api/auth/session-token")
|
||||
# The endpoint is gone — the catch-all SPA route serves index.html
|
||||
# or the middleware returns 401 for unauthenticated /api/ paths.
|
||||
assert resp.status_code in (200, 404)
|
||||
# Either way, it must NOT return the token as JSON
|
||||
try:
|
||||
data = resp.json()
|
||||
assert "token" not in data
|
||||
except Exception:
|
||||
pass # Not JSON — that's fine (SPA HTML)
|
||||
|
||||
def test_unauthenticated_api_blocked(self):
|
||||
"""API requests without the session token should be rejected."""
|
||||
from starlette.testclient import TestClient
|
||||
from hermes_cli.web_server import app
|
||||
# Create a client WITHOUT the Authorization header
|
||||
unauth_client = TestClient(app)
|
||||
resp = unauth_client.get("/api/env")
|
||||
assert resp.status_code == 401
|
||||
resp = unauth_client.get("/api/config")
|
||||
assert resp.status_code == 401
|
||||
# Public endpoints should still work
|
||||
resp = unauth_client.get("/api/status")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["token"] == _SESSION_TOKEN
|
||||
|
||||
def test_path_traversal_blocked(self):
|
||||
"""Verify URL-encoded path traversal is blocked."""
|
||||
|
|
@ -358,8 +383,9 @@ class TestConfigRoundTrip:
|
|||
from starlette.testclient import TestClient
|
||||
except ImportError:
|
||||
pytest.skip("fastapi/starlette not installed")
|
||||
from hermes_cli.web_server import app
|
||||
from hermes_cli.web_server import app, _SESSION_TOKEN
|
||||
self.client = TestClient(app)
|
||||
self.client.headers["Authorization"] = f"Bearer {_SESSION_TOKEN}"
|
||||
|
||||
def test_get_config_no_internal_keys(self):
|
||||
"""GET /api/config should not expose _config_version or _model_meta."""
|
||||
|
|
@ -490,8 +516,9 @@ class TestNewEndpoints:
|
|||
from starlette.testclient import TestClient
|
||||
except ImportError:
|
||||
pytest.skip("fastapi/starlette not installed")
|
||||
from hermes_cli.web_server import app
|
||||
from hermes_cli.web_server import app, _SESSION_TOKEN
|
||||
self.client = TestClient(app)
|
||||
self.client.headers["Authorization"] = f"Bearer {_SESSION_TOKEN}"
|
||||
|
||||
def test_get_logs_default(self):
|
||||
resp = self.client.get("/api/logs")
|
||||
|
|
@ -668,11 +695,16 @@ class TestNewEndpoints:
|
|||
assert isinstance(data["daily"], list)
|
||||
assert "total_sessions" in data["totals"]
|
||||
|
||||
def test_session_token_endpoint(self):
|
||||
from hermes_cli.web_server import _SESSION_TOKEN
|
||||
def test_session_token_endpoint_removed(self):
|
||||
"""GET /api/auth/session-token no longer exists."""
|
||||
resp = self.client.get("/api/auth/session-token")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["token"] == _SESSION_TOKEN
|
||||
# Should not return a JSON token object
|
||||
assert resp.status_code in (200, 404)
|
||||
try:
|
||||
data = resp.json()
|
||||
assert "token" not in data
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -952,3 +984,195 @@ class TestModelInfoEndpoint:
|
|||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["auto_context_length"] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gateway health probe tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestProbeGatewayHealth:
|
||||
"""Tests for _probe_gateway_health() — cross-container gateway detection."""
|
||||
|
||||
def test_returns_false_when_no_url_configured(self, monkeypatch):
|
||||
"""When GATEWAY_HEALTH_URL is unset, the probe returns (False, None)."""
|
||||
import hermes_cli.web_server as ws
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", None)
|
||||
alive, body = ws._probe_gateway_health()
|
||||
assert alive is False
|
||||
assert body is None
|
||||
|
||||
def test_normalizes_url_with_health_suffix(self, monkeypatch):
|
||||
"""If the user sets the URL to include /health, it's stripped to base."""
|
||||
import hermes_cli.web_server as ws
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", "http://gw:8642/health")
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_TIMEOUT", 1)
|
||||
# Both paths should fail (no server), but we verify they were constructed
|
||||
# correctly by checking the URLs attempted.
|
||||
calls = []
|
||||
original_urlopen = ws.urllib.request.urlopen
|
||||
|
||||
def mock_urlopen(req, **kwargs):
|
||||
calls.append(req.full_url)
|
||||
raise ConnectionError("mock")
|
||||
|
||||
monkeypatch.setattr(ws.urllib.request, "urlopen", mock_urlopen)
|
||||
alive, body = ws._probe_gateway_health()
|
||||
assert alive is False
|
||||
assert "http://gw:8642/health/detailed" in calls
|
||||
assert "http://gw:8642/health" in calls
|
||||
|
||||
def test_normalizes_url_with_health_detailed_suffix(self, monkeypatch):
|
||||
"""If the user sets the URL to include /health/detailed, it's stripped to base."""
|
||||
import hermes_cli.web_server as ws
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", "http://gw:8642/health/detailed")
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_TIMEOUT", 1)
|
||||
calls = []
|
||||
|
||||
def mock_urlopen(req, **kwargs):
|
||||
calls.append(req.full_url)
|
||||
raise ConnectionError("mock")
|
||||
|
||||
monkeypatch.setattr(ws.urllib.request, "urlopen", mock_urlopen)
|
||||
ws._probe_gateway_health()
|
||||
assert "http://gw:8642/health/detailed" in calls
|
||||
assert "http://gw:8642/health" in calls
|
||||
|
||||
def test_successful_detailed_probe(self, monkeypatch):
|
||||
"""Successful /health/detailed probe returns (True, body_dict)."""
|
||||
import hermes_cli.web_server as ws
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", "http://gw:8642")
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_TIMEOUT", 1)
|
||||
|
||||
response_body = json.dumps({
|
||||
"status": "ok",
|
||||
"gateway_state": "running",
|
||||
"pid": 42,
|
||||
})
|
||||
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status = 200
|
||||
mock_resp.read.return_value = response_body.encode()
|
||||
mock_resp.__enter__ = MagicMock(return_value=mock_resp)
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
monkeypatch.setattr(ws.urllib.request, "urlopen", lambda req, **kw: mock_resp)
|
||||
alive, body = ws._probe_gateway_health()
|
||||
assert alive is True
|
||||
assert body["status"] == "ok"
|
||||
assert body["pid"] == 42
|
||||
|
||||
def test_detailed_fails_falls_back_to_simple_health(self, monkeypatch):
|
||||
"""If /health/detailed fails, falls back to /health."""
|
||||
import hermes_cli.web_server as ws
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", "http://gw:8642")
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_TIMEOUT", 1)
|
||||
|
||||
call_count = [0]
|
||||
|
||||
def mock_urlopen(req, **kwargs):
|
||||
call_count[0] += 1
|
||||
if call_count[0] == 1:
|
||||
raise ConnectionError("detailed failed")
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status = 200
|
||||
mock_resp.read.return_value = json.dumps({"status": "ok"}).encode()
|
||||
mock_resp.__enter__ = MagicMock(return_value=mock_resp)
|
||||
mock_resp.__exit__ = MagicMock(return_value=False)
|
||||
return mock_resp
|
||||
|
||||
monkeypatch.setattr(ws.urllib.request, "urlopen", mock_urlopen)
|
||||
alive, body = ws._probe_gateway_health()
|
||||
assert alive is True
|
||||
assert body["status"] == "ok"
|
||||
assert call_count[0] == 2
|
||||
|
||||
|
||||
class TestStatusRemoteGateway:
|
||||
"""Tests for /api/status with remote gateway health fallback."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _setup_test_client(self):
|
||||
try:
|
||||
from starlette.testclient import TestClient
|
||||
except ImportError:
|
||||
pytest.skip("fastapi/starlette not installed")
|
||||
|
||||
from hermes_cli.web_server import app, _SESSION_TOKEN
|
||||
self.client = TestClient(app)
|
||||
self.client.headers["Authorization"] = f"Bearer {_SESSION_TOKEN}"
|
||||
|
||||
def test_status_falls_back_to_remote_probe(self, monkeypatch):
|
||||
"""When local PID check fails and remote probe succeeds, gateway shows running."""
|
||||
import hermes_cli.web_server as ws
|
||||
|
||||
monkeypatch.setattr(ws, "get_running_pid", lambda: None)
|
||||
monkeypatch.setattr(ws, "read_runtime_status", lambda: None)
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", "http://gw:8642")
|
||||
monkeypatch.setattr(ws, "_probe_gateway_health", lambda: (True, {
|
||||
"status": "ok",
|
||||
"gateway_state": "running",
|
||||
"platforms": {"telegram": {"state": "connected"}},
|
||||
"pid": 999,
|
||||
}))
|
||||
|
||||
resp = self.client.get("/api/status")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["gateway_running"] is True
|
||||
assert data["gateway_pid"] == 999
|
||||
assert data["gateway_state"] == "running"
|
||||
|
||||
def test_status_remote_probe_not_attempted_when_local_pid_found(self, monkeypatch):
|
||||
"""When local PID check succeeds, the remote probe is never called."""
|
||||
import hermes_cli.web_server as ws
|
||||
|
||||
monkeypatch.setattr(ws, "get_running_pid", lambda: 1234)
|
||||
monkeypatch.setattr(ws, "read_runtime_status", lambda: {
|
||||
"gateway_state": "running",
|
||||
"platforms": {},
|
||||
})
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", "http://gw:8642")
|
||||
probe_called = [False]
|
||||
original = ws._probe_gateway_health
|
||||
|
||||
def track_probe():
|
||||
probe_called[0] = True
|
||||
return original()
|
||||
|
||||
monkeypatch.setattr(ws, "_probe_gateway_health", track_probe)
|
||||
|
||||
resp = self.client.get("/api/status")
|
||||
assert resp.status_code == 200
|
||||
assert not probe_called[0]
|
||||
|
||||
def test_status_remote_probe_not_attempted_when_no_url(self, monkeypatch):
|
||||
"""When GATEWAY_HEALTH_URL is unset, no probe is attempted."""
|
||||
import hermes_cli.web_server as ws
|
||||
|
||||
monkeypatch.setattr(ws, "get_running_pid", lambda: None)
|
||||
monkeypatch.setattr(ws, "read_runtime_status", lambda: None)
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", None)
|
||||
|
||||
resp = self.client.get("/api/status")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["gateway_running"] is False
|
||||
|
||||
def test_status_remote_running_null_pid(self, monkeypatch):
|
||||
"""Remote gateway running but PID not in response — pid should be None."""
|
||||
import hermes_cli.web_server as ws
|
||||
|
||||
monkeypatch.setattr(ws, "get_running_pid", lambda: None)
|
||||
monkeypatch.setattr(ws, "read_runtime_status", lambda: None)
|
||||
monkeypatch.setattr(ws, "_GATEWAY_HEALTH_URL", "http://gw:8642")
|
||||
monkeypatch.setattr(ws, "_probe_gateway_health", lambda: (True, {
|
||||
"status": "ok",
|
||||
}))
|
||||
|
||||
resp = self.client.get("/api/status")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["gateway_running"] is True
|
||||
assert data["gateway_pid"] is None
|
||||
assert data["gateway_state"] == "running"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue