Merge branch 'main' of github.com:NousResearch/hermes-agent into feat/ink-refactor

This commit is contained in:
Brooklyn Nicholson 2026-04-12 13:18:55 -05:00
commit 2aea75e91e
131 changed files with 12350 additions and 1164 deletions

View file

@ -14,6 +14,7 @@ from hermes_cli.auth import (
PROVIDER_REGISTRY,
_read_codex_tokens,
_save_codex_tokens,
_write_codex_cli_tokens,
_import_codex_cli_tokens,
get_codex_auth_status,
get_provider_auth_state,
@ -161,7 +162,7 @@ def test_import_codex_cli_tokens_missing(tmp_path, monkeypatch):
def test_codex_tokens_not_written_to_shared_file(tmp_path, monkeypatch):
"""Verify Hermes never writes to ~/.codex/auth.json."""
"""Verify _save_codex_tokens writes only to Hermes auth store, not ~/.codex/."""
hermes_home = tmp_path / "hermes"
codex_home = tmp_path / "codex-cli"
hermes_home.mkdir(parents=True, exist_ok=True)
@ -173,7 +174,7 @@ def test_codex_tokens_not_written_to_shared_file(tmp_path, monkeypatch):
_save_codex_tokens({"access_token": "hermes-at", "refresh_token": "hermes-rt"})
# ~/.codex/auth.json should NOT exist
# ~/.codex/auth.json should NOT exist — _save_codex_tokens only touches Hermes store
assert not (codex_home / "auth.json").exists()
# Hermes auth store should have the tokens
@ -181,6 +182,98 @@ def test_codex_tokens_not_written_to_shared_file(tmp_path, monkeypatch):
assert data["tokens"]["access_token"] == "hermes-at"
def test_write_codex_cli_tokens_creates_file(tmp_path, monkeypatch):
"""_write_codex_cli_tokens creates ~/.codex/auth.json with refreshed tokens."""
codex_home = tmp_path / "codex-cli"
monkeypatch.setenv("CODEX_HOME", str(codex_home))
_write_codex_cli_tokens("new-access", "new-refresh", last_refresh="2026-04-12T00:00:00Z")
auth_path = codex_home / "auth.json"
assert auth_path.exists()
data = json.loads(auth_path.read_text())
assert data["tokens"]["access_token"] == "new-access"
assert data["tokens"]["refresh_token"] == "new-refresh"
assert data["last_refresh"] == "2026-04-12T00:00:00Z"
# Verify file permissions are restricted
assert (auth_path.stat().st_mode & 0o777) == 0o600
def test_write_codex_cli_tokens_preserves_existing(tmp_path, monkeypatch):
"""_write_codex_cli_tokens preserves extra fields in existing auth.json."""
codex_home = tmp_path / "codex-cli"
codex_home.mkdir(parents=True, exist_ok=True)
monkeypatch.setenv("CODEX_HOME", str(codex_home))
existing = {
"tokens": {
"access_token": "old-access",
"refresh_token": "old-refresh",
"extra_field": "preserved",
},
"last_refresh": "2026-01-01T00:00:00Z",
"custom_key": "keep_me",
}
(codex_home / "auth.json").write_text(json.dumps(existing))
_write_codex_cli_tokens("updated-access", "updated-refresh")
data = json.loads((codex_home / "auth.json").read_text())
assert data["tokens"]["access_token"] == "updated-access"
assert data["tokens"]["refresh_token"] == "updated-refresh"
assert data["tokens"]["extra_field"] == "preserved"
assert data["custom_key"] == "keep_me"
# last_refresh not updated since we didn't pass it
assert data["last_refresh"] == "2026-01-01T00:00:00Z"
def test_write_codex_cli_tokens_handles_missing_dir(tmp_path, monkeypatch):
"""_write_codex_cli_tokens creates parent directories if missing."""
codex_home = tmp_path / "does" / "not" / "exist"
monkeypatch.setenv("CODEX_HOME", str(codex_home))
_write_codex_cli_tokens("at", "rt")
assert (codex_home / "auth.json").exists()
data = json.loads((codex_home / "auth.json").read_text())
assert data["tokens"]["access_token"] == "at"
def test_refresh_codex_auth_tokens_writes_back_to_cli(tmp_path, monkeypatch):
"""After refreshing, _refresh_codex_auth_tokens writes back to ~/.codex/auth.json."""
from hermes_cli.auth import _refresh_codex_auth_tokens
hermes_home = tmp_path / "hermes"
codex_home = tmp_path / "codex-cli"
hermes_home.mkdir(parents=True, exist_ok=True)
codex_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps({"version": 1, "providers": {}}))
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("CODEX_HOME", str(codex_home))
# Write initial CLI tokens
(codex_home / "auth.json").write_text(json.dumps({
"tokens": {"access_token": "old-at", "refresh_token": "old-rt"},
}))
# Mock the pure refresh to return new tokens
monkeypatch.setattr("hermes_cli.auth.refresh_codex_oauth_pure", lambda *a, **kw: {
"access_token": "refreshed-at",
"refresh_token": "refreshed-rt",
"last_refresh": "2026-04-12T01:00:00Z",
})
_refresh_codex_auth_tokens(
{"access_token": "old-at", "refresh_token": "old-rt"},
timeout_seconds=10,
)
# Verify CLI file was updated
cli_data = json.loads((codex_home / "auth.json").read_text())
assert cli_data["tokens"]["access_token"] == "refreshed-at"
assert cli_data["tokens"]["refresh_token"] == "refreshed-rt"
def test_resolve_returns_hermes_auth_store_source(tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
_setup_hermes_auth(hermes_home)

View file

@ -0,0 +1,897 @@
"""Tests for hermes backup and import commands."""
import os
import zipfile
from argparse import Namespace
from pathlib import Path
from unittest.mock import patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_hermes_tree(root: Path) -> None:
"""Create a realistic ~/.hermes directory structure for testing."""
(root / "config.yaml").write_text("model:\n provider: openrouter\n")
(root / ".env").write_text("OPENROUTER_API_KEY=sk-test-123\n")
(root / "memory_store.db").write_bytes(b"fake-sqlite")
(root / "hermes_state.db").write_bytes(b"fake-state")
# Sessions
(root / "sessions").mkdir(exist_ok=True)
(root / "sessions" / "abc123.json").write_text("{}")
# Skills
(root / "skills").mkdir(exist_ok=True)
(root / "skills" / "my-skill").mkdir()
(root / "skills" / "my-skill" / "SKILL.md").write_text("# My Skill\n")
# Skins
(root / "skins").mkdir(exist_ok=True)
(root / "skins" / "cyber.yaml").write_text("name: cyber\n")
# Cron
(root / "cron").mkdir(exist_ok=True)
(root / "cron" / "jobs.json").write_text("[]")
# Memories
(root / "memories").mkdir(exist_ok=True)
(root / "memories" / "notes.json").write_text("{}")
# Profiles
(root / "profiles").mkdir(exist_ok=True)
(root / "profiles" / "coder").mkdir()
(root / "profiles" / "coder" / "config.yaml").write_text("model:\n provider: anthropic\n")
(root / "profiles" / "coder" / ".env").write_text("ANTHROPIC_API_KEY=sk-ant-123\n")
# hermes-agent repo (should be EXCLUDED)
(root / "hermes-agent").mkdir(exist_ok=True)
(root / "hermes-agent" / "run_agent.py").write_text("# big file\n")
(root / "hermes-agent" / ".git").mkdir()
(root / "hermes-agent" / ".git" / "HEAD").write_text("ref: refs/heads/main\n")
# __pycache__ (should be EXCLUDED)
(root / "plugins").mkdir(exist_ok=True)
(root / "plugins" / "__pycache__").mkdir()
(root / "plugins" / "__pycache__" / "mod.cpython-312.pyc").write_bytes(b"\x00")
# PID files (should be EXCLUDED)
(root / "gateway.pid").write_text("12345")
# Logs (should be included)
(root / "logs").mkdir(exist_ok=True)
(root / "logs" / "agent.log").write_text("log line\n")
# ---------------------------------------------------------------------------
# _should_exclude tests
# ---------------------------------------------------------------------------
class TestShouldExclude:
def test_excludes_hermes_agent(self):
from hermes_cli.backup import _should_exclude
assert _should_exclude(Path("hermes-agent/run_agent.py"))
assert _should_exclude(Path("hermes-agent/.git/HEAD"))
def test_excludes_pycache(self):
from hermes_cli.backup import _should_exclude
assert _should_exclude(Path("plugins/__pycache__/mod.cpython-312.pyc"))
def test_excludes_pyc_files(self):
from hermes_cli.backup import _should_exclude
assert _should_exclude(Path("some/module.pyc"))
def test_excludes_pid_files(self):
from hermes_cli.backup import _should_exclude
assert _should_exclude(Path("gateway.pid"))
assert _should_exclude(Path("cron.pid"))
def test_includes_config(self):
from hermes_cli.backup import _should_exclude
assert not _should_exclude(Path("config.yaml"))
def test_includes_env(self):
from hermes_cli.backup import _should_exclude
assert not _should_exclude(Path(".env"))
def test_includes_skills(self):
from hermes_cli.backup import _should_exclude
assert not _should_exclude(Path("skills/my-skill/SKILL.md"))
def test_includes_profiles(self):
from hermes_cli.backup import _should_exclude
assert not _should_exclude(Path("profiles/coder/config.yaml"))
def test_includes_sessions(self):
from hermes_cli.backup import _should_exclude
assert not _should_exclude(Path("sessions/abc.json"))
def test_includes_logs(self):
from hermes_cli.backup import _should_exclude
assert not _should_exclude(Path("logs/agent.log"))
# ---------------------------------------------------------------------------
# Backup tests
# ---------------------------------------------------------------------------
class TestBackup:
def test_creates_zip(self, tmp_path, monkeypatch):
"""Backup creates a valid zip containing expected files."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
_make_hermes_tree(hermes_home)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# get_default_hermes_root needs this
monkeypatch.setattr(Path, "home", lambda: tmp_path)
out_zip = tmp_path / "backup.zip"
args = Namespace(output=str(out_zip))
from hermes_cli.backup import run_backup
run_backup(args)
assert out_zip.exists()
with zipfile.ZipFile(out_zip, "r") as zf:
names = zf.namelist()
# Config should be present
assert "config.yaml" in names
assert ".env" in names
# Skills
assert "skills/my-skill/SKILL.md" in names
# Profiles
assert "profiles/coder/config.yaml" in names
assert "profiles/coder/.env" in names
# Sessions
assert "sessions/abc123.json" in names
# Logs
assert "logs/agent.log" in names
# Skins
assert "skins/cyber.yaml" in names
def test_excludes_hermes_agent(self, tmp_path, monkeypatch):
"""Backup does NOT include hermes-agent/ directory."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
_make_hermes_tree(hermes_home)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
out_zip = tmp_path / "backup.zip"
args = Namespace(output=str(out_zip))
from hermes_cli.backup import run_backup
run_backup(args)
with zipfile.ZipFile(out_zip, "r") as zf:
names = zf.namelist()
agent_files = [n for n in names if "hermes-agent" in n]
assert agent_files == [], f"hermes-agent files leaked into backup: {agent_files}"
def test_excludes_pycache(self, tmp_path, monkeypatch):
"""Backup does NOT include __pycache__ dirs."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
_make_hermes_tree(hermes_home)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
out_zip = tmp_path / "backup.zip"
args = Namespace(output=str(out_zip))
from hermes_cli.backup import run_backup
run_backup(args)
with zipfile.ZipFile(out_zip, "r") as zf:
names = zf.namelist()
pycache_files = [n for n in names if "__pycache__" in n]
assert pycache_files == []
def test_excludes_pid_files(self, tmp_path, monkeypatch):
"""Backup does NOT include PID files."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
_make_hermes_tree(hermes_home)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
out_zip = tmp_path / "backup.zip"
args = Namespace(output=str(out_zip))
from hermes_cli.backup import run_backup
run_backup(args)
with zipfile.ZipFile(out_zip, "r") as zf:
names = zf.namelist()
pid_files = [n for n in names if n.endswith(".pid")]
assert pid_files == []
def test_default_output_path(self, tmp_path, monkeypatch):
"""When no output path given, zip goes to ~/hermes-backup-*.zip."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("model: test\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
args = Namespace(output=None)
from hermes_cli.backup import run_backup
run_backup(args)
# Should exist in home dir
zips = list(tmp_path.glob("hermes-backup-*.zip"))
assert len(zips) == 1
# ---------------------------------------------------------------------------
# Import tests
# ---------------------------------------------------------------------------
class TestImport:
def _make_backup_zip(self, zip_path: Path, files: dict[str, str | bytes]) -> None:
"""Create a test zip with given files."""
with zipfile.ZipFile(zip_path, "w") as zf:
for name, content in files.items():
if isinstance(content, bytes):
zf.writestr(name, content)
else:
zf.writestr(name, content)
def test_restores_files(self, tmp_path, monkeypatch):
"""Import extracts files into hermes home."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
"config.yaml": "model:\n provider: openrouter\n",
".env": "OPENROUTER_API_KEY=sk-test\n",
"skills/my-skill/SKILL.md": "# My Skill\n",
"profiles/coder/config.yaml": "model:\n provider: anthropic\n",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
run_import(args)
assert (hermes_home / "config.yaml").read_text() == "model:\n provider: openrouter\n"
assert (hermes_home / ".env").read_text() == "OPENROUTER_API_KEY=sk-test\n"
assert (hermes_home / "skills" / "my-skill" / "SKILL.md").read_text() == "# My Skill\n"
assert (hermes_home / "profiles" / "coder" / "config.yaml").exists()
def test_strips_hermes_prefix(self, tmp_path, monkeypatch):
"""Import strips .hermes/ prefix if all entries share it."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
".hermes/config.yaml": "model: test\n",
".hermes/skills/a/SKILL.md": "# A\n",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
run_import(args)
assert (hermes_home / "config.yaml").read_text() == "model: test\n"
assert (hermes_home / "skills" / "a" / "SKILL.md").read_text() == "# A\n"
def test_rejects_empty_zip(self, tmp_path, monkeypatch):
"""Import rejects an empty zip."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "empty.zip"
with zipfile.ZipFile(zip_path, "w"):
pass # empty
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
with pytest.raises(SystemExit):
run_import(args)
def test_rejects_non_hermes_zip(self, tmp_path, monkeypatch):
"""Import rejects a zip that doesn't look like a hermes backup."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "random.zip"
self._make_backup_zip(zip_path, {
"some/random/file.txt": "hello",
"another/thing.json": "{}",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
with pytest.raises(SystemExit):
run_import(args)
def test_blocks_path_traversal(self, tmp_path, monkeypatch):
"""Import blocks zip entries with path traversal."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "evil.zip"
# Include a marker file so validation passes
self._make_backup_zip(zip_path, {
"config.yaml": "model: test\n",
"../../etc/passwd": "root:x:0:0\n",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
run_import(args)
# config.yaml should be restored
assert (hermes_home / "config.yaml").exists()
# traversal file should NOT exist outside hermes home
assert not (tmp_path / "etc" / "passwd").exists()
def test_confirmation_prompt_abort(self, tmp_path, monkeypatch):
"""Import aborts when user says no to confirmation."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
# Pre-existing config triggers the confirmation
(hermes_home / "config.yaml").write_text("existing: true\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
"config.yaml": "model: restored\n",
})
args = Namespace(zipfile=str(zip_path), force=False)
from hermes_cli.backup import run_import
with patch("builtins.input", return_value="n"):
run_import(args)
# Original config should be unchanged
assert (hermes_home / "config.yaml").read_text() == "existing: true\n"
def test_force_skips_confirmation(self, tmp_path, monkeypatch):
"""Import with --force skips confirmation and overwrites."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("existing: true\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
"config.yaml": "model: restored\n",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
run_import(args)
assert (hermes_home / "config.yaml").read_text() == "model: restored\n"
def test_missing_file_exits(self, tmp_path, monkeypatch):
"""Import exits with error for nonexistent file."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
args = Namespace(zipfile=str(tmp_path / "nonexistent.zip"), force=True)
from hermes_cli.backup import run_import
with pytest.raises(SystemExit):
run_import(args)
# ---------------------------------------------------------------------------
# Round-trip test
# ---------------------------------------------------------------------------
class TestRoundTrip:
def test_backup_then_import(self, tmp_path, monkeypatch):
"""Full round-trip: backup -> import to a new location -> verify."""
# Source
src_home = tmp_path / "source" / ".hermes"
src_home.mkdir(parents=True)
_make_hermes_tree(src_home)
monkeypatch.setenv("HERMES_HOME", str(src_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path / "source")
# Backup
out_zip = tmp_path / "roundtrip.zip"
from hermes_cli.backup import run_backup, run_import
run_backup(Namespace(output=str(out_zip)))
assert out_zip.exists()
# Import into a different location
dst_home = tmp_path / "dest" / ".hermes"
dst_home.mkdir(parents=True)
monkeypatch.setenv("HERMES_HOME", str(dst_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path / "dest")
run_import(Namespace(zipfile=str(out_zip), force=True))
# Verify key files
assert (dst_home / "config.yaml").read_text() == "model:\n provider: openrouter\n"
assert (dst_home / ".env").read_text() == "OPENROUTER_API_KEY=sk-test-123\n"
assert (dst_home / "skills" / "my-skill" / "SKILL.md").exists()
assert (dst_home / "profiles" / "coder" / "config.yaml").exists()
assert (dst_home / "sessions" / "abc123.json").exists()
assert (dst_home / "logs" / "agent.log").exists()
# hermes-agent should NOT be present
assert not (dst_home / "hermes-agent").exists()
# __pycache__ should NOT be present
assert not (dst_home / "plugins" / "__pycache__").exists()
# PID files should NOT be present
assert not (dst_home / "gateway.pid").exists()
# ---------------------------------------------------------------------------
# Validate / detect-prefix unit tests
# ---------------------------------------------------------------------------
class TestFormatSize:
def test_bytes(self):
from hermes_cli.backup import _format_size
assert _format_size(512) == "512 B"
def test_kilobytes(self):
from hermes_cli.backup import _format_size
assert "KB" in _format_size(2048)
def test_megabytes(self):
from hermes_cli.backup import _format_size
assert "MB" in _format_size(5 * 1024 * 1024)
def test_gigabytes(self):
from hermes_cli.backup import _format_size
assert "GB" in _format_size(3 * 1024 ** 3)
def test_terabytes(self):
from hermes_cli.backup import _format_size
assert "TB" in _format_size(2 * 1024 ** 4)
class TestValidation:
def test_validate_with_config(self):
"""Zip with config.yaml passes validation."""
import io
from hermes_cli.backup import _validate_backup_zip
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("config.yaml", "test")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zf:
ok, reason = _validate_backup_zip(zf)
assert ok
def test_validate_with_env(self):
"""Zip with .env passes validation."""
import io
from hermes_cli.backup import _validate_backup_zip
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(".env", "KEY=val")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zf:
ok, reason = _validate_backup_zip(zf)
assert ok
def test_validate_rejects_random(self):
"""Zip without hermes markers fails validation."""
import io
from hermes_cli.backup import _validate_backup_zip
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("random/file.txt", "hello")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zf:
ok, reason = _validate_backup_zip(zf)
assert not ok
def test_detect_prefix_hermes(self):
"""Detects .hermes/ prefix wrapping all entries."""
import io
from hermes_cli.backup import _detect_prefix
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(".hermes/config.yaml", "test")
zf.writestr(".hermes/skills/a/SKILL.md", "skill")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zf:
assert _detect_prefix(zf) == ".hermes/"
def test_detect_prefix_none(self):
"""No prefix when entries are at root."""
import io
from hermes_cli.backup import _detect_prefix
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("config.yaml", "test")
zf.writestr("skills/a/SKILL.md", "skill")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zf:
assert _detect_prefix(zf) == ""
def test_detect_prefix_only_dirs(self):
"""Prefix detection returns empty for zip with only directory entries."""
import io
from hermes_cli.backup import _detect_prefix
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
# Only directory entries (trailing slash)
zf.writestr(".hermes/", "")
zf.writestr(".hermes/skills/", "")
buf.seek(0)
with zipfile.ZipFile(buf, "r") as zf:
assert _detect_prefix(zf) == ""
# ---------------------------------------------------------------------------
# Edge case tests for uncovered paths
# ---------------------------------------------------------------------------
class TestBackupEdgeCases:
def test_nonexistent_hermes_home(self, tmp_path, monkeypatch):
"""Backup exits when hermes home doesn't exist."""
fake_home = tmp_path / "nonexistent" / ".hermes"
monkeypatch.setenv("HERMES_HOME", str(fake_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path / "nonexistent")
args = Namespace(output=str(tmp_path / "out.zip"))
from hermes_cli.backup import run_backup
with pytest.raises(SystemExit):
run_backup(args)
def test_output_is_directory(self, tmp_path, monkeypatch):
"""When output path is a directory, zip is created inside it."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("model: test\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
out_dir = tmp_path / "backups"
out_dir.mkdir()
args = Namespace(output=str(out_dir))
from hermes_cli.backup import run_backup
run_backup(args)
zips = list(out_dir.glob("hermes-backup-*.zip"))
assert len(zips) == 1
def test_output_without_zip_suffix(self, tmp_path, monkeypatch):
"""Output path without .zip gets suffix appended."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("model: test\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
out_path = tmp_path / "mybackup.tar"
args = Namespace(output=str(out_path))
from hermes_cli.backup import run_backup
run_backup(args)
# Should have .tar.zip suffix
assert (tmp_path / "mybackup.tar.zip").exists()
def test_empty_hermes_home(self, tmp_path, monkeypatch):
"""Backup handles empty hermes home (no files to back up)."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
# Only excluded dirs, no actual files
(hermes_home / "__pycache__").mkdir()
(hermes_home / "__pycache__" / "foo.pyc").write_bytes(b"\x00")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
args = Namespace(output=str(tmp_path / "out.zip"))
from hermes_cli.backup import run_backup
run_backup(args)
# No zip should be created
assert not (tmp_path / "out.zip").exists()
def test_permission_error_during_backup(self, tmp_path, monkeypatch):
"""Backup handles permission errors gracefully."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("model: test\n")
# Create an unreadable file
bad_file = hermes_home / "secret.db"
bad_file.write_text("data")
bad_file.chmod(0o000)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
out_zip = tmp_path / "out.zip"
args = Namespace(output=str(out_zip))
from hermes_cli.backup import run_backup
try:
run_backup(args)
finally:
# Restore permissions for cleanup
bad_file.chmod(0o644)
# Zip should still be created with the readable files
assert out_zip.exists()
def test_skips_output_zip_inside_hermes(self, tmp_path, monkeypatch):
"""Backup skips its own output zip if it's inside hermes root."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("model: test\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Output inside hermes home
out_zip = hermes_home / "backup.zip"
args = Namespace(output=str(out_zip))
from hermes_cli.backup import run_backup
run_backup(args)
# The zip should exist but not contain itself
assert out_zip.exists()
with zipfile.ZipFile(out_zip, "r") as zf:
assert "backup.zip" not in zf.namelist()
class TestImportEdgeCases:
def _make_backup_zip(self, zip_path: Path, files: dict[str, str | bytes]) -> None:
with zipfile.ZipFile(zip_path, "w") as zf:
for name, content in files.items():
zf.writestr(name, content)
def test_not_a_zip(self, tmp_path, monkeypatch):
"""Import rejects a non-zip file."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
not_zip = tmp_path / "fake.zip"
not_zip.write_text("this is not a zip")
args = Namespace(zipfile=str(not_zip), force=True)
from hermes_cli.backup import run_import
with pytest.raises(SystemExit):
run_import(args)
def test_eof_during_confirmation(self, tmp_path, monkeypatch):
"""Import handles EOFError during confirmation prompt."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("existing\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {"config.yaml": "new\n"})
args = Namespace(zipfile=str(zip_path), force=False)
from hermes_cli.backup import run_import
with patch("builtins.input", side_effect=EOFError):
with pytest.raises(SystemExit):
run_import(args)
def test_keyboard_interrupt_during_confirmation(self, tmp_path, monkeypatch):
"""Import handles KeyboardInterrupt during confirmation prompt."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / ".env").write_text("KEY=val\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {"config.yaml": "new\n"})
args = Namespace(zipfile=str(zip_path), force=False)
from hermes_cli.backup import run_import
with patch("builtins.input", side_effect=KeyboardInterrupt):
with pytest.raises(SystemExit):
run_import(args)
def test_permission_error_during_import(self, tmp_path, monkeypatch):
"""Import handles permission errors during extraction."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Create a read-only directory so extraction fails
locked_dir = hermes_home / "locked"
locked_dir.mkdir()
locked_dir.chmod(0o555)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
"config.yaml": "model: test\n",
"locked/secret.txt": "data",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
try:
run_import(args)
finally:
locked_dir.chmod(0o755)
# config.yaml should still be restored despite the error
assert (hermes_home / "config.yaml").exists()
def test_progress_with_many_files(self, tmp_path, monkeypatch):
"""Import shows progress with 500+ files."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "big.zip"
files = {"config.yaml": "model: test\n"}
for i in range(600):
files[f"sessions/s{i:04d}.json"] = "{}"
self._make_backup_zip(zip_path, files)
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
run_import(args)
assert (hermes_home / "config.yaml").exists()
assert (hermes_home / "sessions" / "s0599.json").exists()
# ---------------------------------------------------------------------------
# Profile restoration tests
# ---------------------------------------------------------------------------
class TestProfileRestoration:
def _make_backup_zip(self, zip_path: Path, files: dict[str, str | bytes]) -> None:
with zipfile.ZipFile(zip_path, "w") as zf:
for name, content in files.items():
zf.writestr(name, content)
def test_import_creates_profile_wrappers(self, tmp_path, monkeypatch):
"""Import auto-creates wrapper scripts for restored profiles."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Mock the wrapper dir to be inside tmp_path
wrapper_dir = tmp_path / ".local" / "bin"
wrapper_dir.mkdir(parents=True)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
"config.yaml": "model:\n provider: openrouter\n",
"profiles/coder/config.yaml": "model:\n provider: anthropic\n",
"profiles/coder/.env": "ANTHROPIC_API_KEY=sk-test\n",
"profiles/researcher/config.yaml": "model:\n provider: deepseek\n",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
run_import(args)
# Profile directories should exist
assert (hermes_home / "profiles" / "coder" / "config.yaml").exists()
assert (hermes_home / "profiles" / "researcher" / "config.yaml").exists()
# Wrapper scripts should be created
assert (wrapper_dir / "coder").exists()
assert (wrapper_dir / "researcher").exists()
# Wrappers should contain the right content
coder_wrapper = (wrapper_dir / "coder").read_text()
assert "hermes -p coder" in coder_wrapper
def test_import_skips_profile_dirs_without_config(self, tmp_path, monkeypatch):
"""Import doesn't create wrappers for profile dirs without config."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
wrapper_dir = tmp_path / ".local" / "bin"
wrapper_dir.mkdir(parents=True)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
"config.yaml": "model: test\n",
"profiles/valid/config.yaml": "model: test\n",
"profiles/empty/readme.txt": "nothing here\n",
})
args = Namespace(zipfile=str(zip_path), force=True)
from hermes_cli.backup import run_import
run_import(args)
# Only valid profile should get a wrapper
assert (wrapper_dir / "valid").exists()
assert not (wrapper_dir / "empty").exists()
def test_import_without_profiles_module(self, tmp_path, monkeypatch):
"""Import gracefully handles missing profiles module (fresh install)."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
zip_path = tmp_path / "backup.zip"
self._make_backup_zip(zip_path, {
"config.yaml": "model: test\n",
"profiles/coder/config.yaml": "model: test\n",
})
args = Namespace(zipfile=str(zip_path), force=True)
# Simulate profiles module not being available
import hermes_cli.backup as backup_mod
original_import = __builtins__.__import__ if hasattr(__builtins__, '__import__') else __import__
def fake_import(name, *a, **kw):
if name == "hermes_cli.profiles":
raise ImportError("no profiles module")
return original_import(name, *a, **kw)
from hermes_cli.backup import run_import
with patch("builtins.__import__", side_effect=fake_import):
run_import(args)
# Files should still be restored even if wrappers can't be created
assert (hermes_home / "profiles" / "coder" / "config.yaml").exists()

View file

@ -58,13 +58,13 @@ class TestFindOpenclawDirs:
def test_finds_legacy_dirs(self, tmp_path):
clawdbot = tmp_path / ".clawdbot"
clawdbot.mkdir()
moldbot = tmp_path / ".moldbot"
moldbot.mkdir()
moltbot = tmp_path / ".moltbot"
moltbot.mkdir()
with patch("pathlib.Path.home", return_value=tmp_path):
found = claw_mod._find_openclaw_dirs()
assert len(found) == 2
assert clawdbot in found
assert moldbot in found
assert moltbot in found
def test_returns_empty_when_none_exist(self, tmp_path):
with patch("pathlib.Path.home", return_value=tmp_path):
@ -297,7 +297,6 @@ class TestCmdMigrate:
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=config_path),
patch.object(claw_mod, "prompt_yes_no", return_value=True),
patch.object(claw_mod, "_offer_source_archival"),
patch("sys.stdin", mock_stdin),
):
claw_mod._cmd_migrate(args)
@ -306,43 +305,8 @@ class TestCmdMigrate:
assert "Migration Results" in captured.out
assert "Migration complete!" in captured.out
def test_execute_offers_archival_on_success(self, tmp_path, capsys):
"""After successful migration, _offer_source_archival should be called."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value={"soul"})
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 3, "skipped": 0, "conflict": 0, "error": 0},
"items": [
{"kind": "soul", "status": "migrated", "destination": str(tmp_path / "SOUL.md")},
],
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=False, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=True,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=tmp_path / "config.yaml"),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
patch.object(claw_mod, "_offer_source_archival") as mock_archival,
):
claw_mod._cmd_migrate(args)
mock_archival.assert_called_once_with(openclaw_dir, True)
def test_dry_run_skips_archival(self, tmp_path, capsys):
"""Dry run should not offer archival."""
def test_dry_run_does_not_touch_source(self, tmp_path, capsys):
"""Dry run should not modify the source directory."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
@ -369,11 +333,10 @@ class TestCmdMigrate:
patch.object(claw_mod, "get_config_path", return_value=tmp_path / "config.yaml"),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
patch.object(claw_mod, "_offer_source_archival") as mock_archival,
):
claw_mod._cmd_migrate(args)
mock_archival.assert_not_called()
assert openclaw_dir.is_dir() # Source untouched
def test_execute_cancelled_by_user(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
@ -506,73 +469,6 @@ class TestCmdMigrate:
assert call_kwargs["migrate_secrets"] is True
# ---------------------------------------------------------------------------
# _offer_source_archival
# ---------------------------------------------------------------------------
class TestOfferSourceArchival:
"""Test the post-migration archival offer."""
def test_archives_with_auto_yes(self, tmp_path, capsys):
source = tmp_path / ".openclaw"
source.mkdir()
(source / "workspace").mkdir()
(source / "workspace" / "todo.json").write_text("{}")
claw_mod._offer_source_archival(source, auto_yes=True)
captured = capsys.readouterr()
assert "Archived" in captured.out
assert not source.exists()
assert (tmp_path / ".openclaw.pre-migration").is_dir()
def test_skips_when_user_declines(self, tmp_path, capsys):
source = tmp_path / ".openclaw"
source.mkdir()
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = True
with (
patch.object(claw_mod, "prompt_yes_no", return_value=False),
patch("sys.stdin", mock_stdin),
):
claw_mod._offer_source_archival(source, auto_yes=False)
captured = capsys.readouterr()
assert "Skipped" in captured.out
assert source.is_dir() # Still exists
def test_noop_when_source_missing(self, tmp_path, capsys):
claw_mod._offer_source_archival(tmp_path / "nonexistent", auto_yes=True)
captured = capsys.readouterr()
assert captured.out == "" # No output
def test_shows_state_files(self, tmp_path, capsys):
source = tmp_path / ".openclaw"
source.mkdir()
ws = source / "workspace"
ws.mkdir()
(ws / "todo.json").write_text("{}")
with patch.object(claw_mod, "prompt_yes_no", return_value=False):
claw_mod._offer_source_archival(source, auto_yes=False)
captured = capsys.readouterr()
assert "todo.json" in captured.out
def test_handles_archive_error(self, tmp_path, capsys):
source = tmp_path / ".openclaw"
source.mkdir()
with patch.object(claw_mod, "_archive_directory", side_effect=OSError("permission denied")):
claw_mod._offer_source_archival(source, auto_yes=True)
captured = capsys.readouterr()
assert "Could not archive" in captured.out
# ---------------------------------------------------------------------------
# _cmd_cleanup
# ---------------------------------------------------------------------------

View file

@ -0,0 +1,254 @@
"""Tests for the interactive CLI /model picker (provider → model drill-down)."""
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
class _FakeBuffer:
def __init__(self, text="draft text"):
self.text = text
self.cursor_position = len(text)
self.reset_calls = []
def reset(self, append_to_history=False):
self.reset_calls.append(append_to_history)
self.text = ""
self.cursor_position = 0
def _make_providers():
return [
{
"slug": "openrouter",
"name": "OpenRouter",
"is_current": True,
"is_user_defined": False,
"models": ["anthropic/claude-opus-4.6", "openai/gpt-5.4"],
"total_models": 2,
"source": "built-in",
},
{
"slug": "anthropic",
"name": "Anthropic",
"is_current": False,
"is_user_defined": False,
"models": ["claude-opus-4.6", "claude-sonnet-4.6"],
"total_models": 2,
"source": "built-in",
},
{
"slug": "custom:my-ollama",
"name": "My Ollama",
"is_current": False,
"is_user_defined": True,
"models": ["llama3", "mistral"],
"total_models": 2,
"source": "user-config",
"api_url": "http://localhost:11434/v1",
},
]
def _make_picker_cli(picker_return_value):
cli = MagicMock()
cli._run_curses_picker = MagicMock(return_value=picker_return_value)
cli._app = MagicMock()
cli._status_bar_visible = True
return cli
def _make_modal_cli():
from cli import HermesCLI
cli = HermesCLI.__new__(HermesCLI)
cli.model = "gpt-5.4"
cli.provider = "openrouter"
cli.requested_provider = "openrouter"
cli.base_url = ""
cli.api_key = ""
cli.api_mode = ""
cli._explicit_api_key = ""
cli._explicit_base_url = ""
cli._pending_model_switch_note = None
cli._model_picker_state = None
cli._modal_input_snapshot = None
cli._status_bar_visible = True
cli._invalidate = MagicMock()
cli.agent = None
cli.config = {}
cli.console = MagicMock()
cli._app = SimpleNamespace(
current_buffer=_FakeBuffer(),
invalidate=MagicMock(),
)
return cli
def test_provider_selection_returns_slug_on_choice():
providers = _make_providers()
cli = _make_picker_cli(1)
from cli import HermesCLI
result = HermesCLI._interactive_provider_selection(cli, providers, "gpt-5.4", "OpenRouter")
assert result == "anthropic"
cli._run_curses_picker.assert_called_once()
def test_provider_selection_returns_none_on_cancel():
providers = _make_providers()
cli = _make_picker_cli(None)
from cli import HermesCLI
result = HermesCLI._interactive_provider_selection(cli, providers, "gpt-5.4", "OpenRouter")
assert result is None
def test_provider_selection_default_is_current():
providers = _make_providers()
cli = _make_picker_cli(0)
from cli import HermesCLI
HermesCLI._interactive_provider_selection(cli, providers, "gpt-5.4", "OpenRouter")
assert cli._run_curses_picker.call_args.kwargs["default_index"] == 0
def test_model_selection_returns_model_on_choice():
provider_data = _make_providers()[0]
cli = _make_picker_cli(0)
from cli import HermesCLI
result = HermesCLI._interactive_model_selection(cli, provider_data["models"], provider_data)
assert result == "anthropic/claude-opus-4.6"
def test_model_selection_custom_entry_prompts_for_input():
provider_data = _make_providers()[0]
cli = _make_picker_cli(2)
from cli import HermesCLI
cli._prompt_text_input = MagicMock(return_value="my-custom-model")
result = HermesCLI._interactive_model_selection(cli, provider_data["models"], provider_data)
assert result == "my-custom-model"
cli._prompt_text_input.assert_called_once_with(" Enter model name: ")
def test_model_selection_empty_prompts_for_manual_input():
provider_data = {
"slug": "custom:empty",
"name": "Empty Provider",
"models": [],
"total_models": 0,
}
cli = _make_picker_cli(None)
from cli import HermesCLI
cli._prompt_text_input = MagicMock(return_value="my-model")
result = HermesCLI._interactive_model_selection(cli, [], provider_data)
assert result == "my-model"
cli._prompt_text_input.assert_called_once_with(" Enter model name manually (or Enter to cancel): ")
def test_prompt_text_input_uses_run_in_terminal_when_app_active():
from cli import HermesCLI
cli = _make_modal_cli()
with (
patch("prompt_toolkit.application.run_in_terminal", side_effect=lambda fn: fn()) as run_mock,
patch("builtins.input", return_value="manual-value"),
):
result = HermesCLI._prompt_text_input(cli, "Enter value: ")
assert result == "manual-value"
run_mock.assert_called_once()
assert cli._status_bar_visible is True
def test_should_handle_model_command_inline_uses_command_name_resolution():
from cli import HermesCLI
cli = _make_modal_cli()
with patch("hermes_cli.commands.resolve_command", return_value=SimpleNamespace(name="model")):
assert HermesCLI._should_handle_model_command_inline(cli, "/model") is True
with patch("hermes_cli.commands.resolve_command", return_value=SimpleNamespace(name="help")):
assert HermesCLI._should_handle_model_command_inline(cli, "/model") is False
assert HermesCLI._should_handle_model_command_inline(cli, "/model", has_images=True) is False
def test_process_command_model_without_args_opens_modal_picker_and_captures_draft():
from cli import HermesCLI
cli = _make_modal_cli()
providers = _make_providers()
with (
patch("hermes_cli.model_switch.list_authenticated_providers", return_value=providers),
patch("cli._cprint"),
):
result = cli.process_command("/model")
assert result is True
assert cli._model_picker_state is not None
assert cli._model_picker_state["stage"] == "provider"
assert cli._model_picker_state["selected"] == 0
assert cli._modal_input_snapshot == {"text": "draft text", "cursor_position": len("draft text")}
assert cli._app.current_buffer.text == ""
def test_model_picker_provider_then_model_selection_applies_switch_result_and_restores_draft():
from cli import HermesCLI
cli = _make_modal_cli()
providers = _make_providers()
with (
patch("hermes_cli.model_switch.list_authenticated_providers", return_value=providers),
patch("cli._cprint"),
):
assert cli.process_command("/model") is True
cli._model_picker_state["selected"] = 1
with patch("hermes_cli.models.provider_model_ids", return_value=["claude-opus-4.6", "claude-sonnet-4.6"]):
HermesCLI._handle_model_picker_selection(cli)
assert cli._model_picker_state["stage"] == "model"
assert cli._model_picker_state["provider_data"]["slug"] == "anthropic"
assert cli._model_picker_state["model_list"] == ["claude-opus-4.6", "claude-sonnet-4.6"]
cli._model_picker_state["selected"] = 0
switch_result = SimpleNamespace(
success=True,
error_message=None,
new_model="claude-opus-4.6",
target_provider="anthropic",
api_key="",
base_url="",
api_mode="anthropic_messages",
provider_label="Anthropic",
model_info=None,
warning_message=None,
provider_changed=True,
)
with (
patch("hermes_cli.model_switch.switch_model", return_value=switch_result) as switch_mock,
patch("cli._cprint"),
):
HermesCLI._handle_model_picker_selection(cli)
assert cli._model_picker_state is None
assert cli.model == "claude-opus-4.6"
assert cli.provider == "anthropic"
assert cli.requested_provider == "anthropic"
assert cli._app.current_buffer.text == "draft text"
switch_mock.assert_called_once()
assert switch_mock.call_args.kwargs["explicit_provider"] == "anthropic"

View file

@ -0,0 +1,241 @@
"""Regression test: openai-codex must appear in /model picker when
credentials are only in the Codex CLI shared file (~/.codex/auth.json)
and haven't been migrated to the Hermes auth store yet.
Root cause: list_authenticated_providers() checked the raw Hermes auth
store but didn't know about the Codex CLI fallback import path.
Fix: _seed_from_singletons() now imports from the Codex CLI when the
Hermes auth store has no openai-codex tokens, and
list_authenticated_providers() falls back to load_pool() for OAuth
providers.
"""
import base64
import json
import os
import sys
import time
from pathlib import Path
from unittest.mock import patch
import pytest
def _make_fake_jwt(expiry_offset: int = 3600) -> str:
"""Build a fake JWT with a future expiry."""
header = base64.urlsafe_b64encode(b'{"alg":"RS256"}').rstrip(b"=").decode()
exp = int(time.time()) + expiry_offset
payload_bytes = json.dumps({"exp": exp, "sub": "test"}).encode()
payload = base64.urlsafe_b64encode(payload_bytes).rstrip(b"=").decode()
return f"{header}.{payload}.fakesig"
@pytest.fixture()
def codex_cli_only_env(tmp_path, monkeypatch):
"""Set up an environment where Codex tokens exist only in ~/.codex/auth.json,
NOT in the Hermes auth store."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
codex_home = tmp_path / ".codex"
codex_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("CODEX_HOME", str(codex_home))
# Empty Hermes auth store
(hermes_home / "auth.json").write_text(
json.dumps({"version": 2, "providers": {}})
)
# Valid Codex CLI tokens
fake_jwt = _make_fake_jwt()
(codex_home / "auth.json").write_text(
json.dumps({
"tokens": {
"access_token": fake_jwt,
"refresh_token": "fake-refresh-token",
}
})
)
# Clear provider env vars so only OAuth is a detection path
for var in [
"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY",
"NOUS_API_KEY", "DEEPSEEK_API_KEY", "COPILOT_GITHUB_TOKEN",
"GH_TOKEN", "GEMINI_API_KEY",
]:
monkeypatch.delenv(var, raising=False)
return hermes_home
def test_codex_cli_tokens_detected_by_model_picker(codex_cli_only_env):
"""openai-codex should appear when tokens only exist in ~/.codex/auth.json."""
from hermes_cli.model_switch import list_authenticated_providers
providers = list_authenticated_providers(
current_provider="openai-codex",
max_models=10,
)
slugs = [p["slug"] for p in providers]
assert "openai-codex" in slugs, (
f"openai-codex not found in /model picker providers: {slugs}"
)
codex = next(p for p in providers if p["slug"] == "openai-codex")
assert codex["is_current"] is True
assert codex["total_models"] > 0
def test_codex_cli_tokens_migrated_after_detection(codex_cli_only_env):
"""After the /model picker detects Codex CLI tokens, they should be
migrated into the Hermes auth store for subsequent fast lookups."""
from hermes_cli.model_switch import list_authenticated_providers
# First call triggers migration
list_authenticated_providers(current_provider="openai-codex")
# Verify tokens are now in Hermes auth store
auth_path = codex_cli_only_env / "auth.json"
store = json.loads(auth_path.read_text())
providers = store.get("providers", {})
assert "openai-codex" in providers, (
f"openai-codex not migrated to Hermes auth store: {list(providers.keys())}"
)
tokens = providers["openai-codex"].get("tokens", {})
assert tokens.get("access_token"), "access_token missing after migration"
assert tokens.get("refresh_token"), "refresh_token missing after migration"
@pytest.fixture()
def hermes_auth_only_env(tmp_path, monkeypatch):
"""Tokens already in Hermes auth store (no Codex CLI needed)."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# Point CODEX_HOME to nonexistent dir to prove it's not needed
monkeypatch.setenv("CODEX_HOME", str(tmp_path / "no_codex"))
(hermes_home / "auth.json").write_text(json.dumps({
"version": 2,
"providers": {
"openai-codex": {
"tokens": {
"access_token": _make_fake_jwt(),
"refresh_token": "fake-refresh",
},
"last_refresh": "2026-04-12T00:00:00Z",
}
},
}))
for var in [
"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY",
"NOUS_API_KEY", "DEEPSEEK_API_KEY",
]:
monkeypatch.delenv(var, raising=False)
return hermes_home
def test_normal_path_still_works(hermes_auth_only_env):
"""openai-codex appears when tokens are already in Hermes auth store."""
from hermes_cli.model_switch import list_authenticated_providers
providers = list_authenticated_providers(
current_provider="openai-codex",
max_models=10,
)
slugs = [p["slug"] for p in providers]
assert "openai-codex" in slugs
@pytest.fixture()
def claude_code_only_env(tmp_path, monkeypatch):
"""Set up an environment where Anthropic credentials only exist in
~/.claude/.credentials.json (Claude Code) not in env vars or Hermes
auth store."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# No Codex CLI
monkeypatch.setenv("CODEX_HOME", str(tmp_path / "no_codex"))
(hermes_home / "auth.json").write_text(
json.dumps({"version": 2, "providers": {}})
)
# Claude Code credentials in the correct format
claude_dir = tmp_path / ".claude"
claude_dir.mkdir()
(claude_dir / ".credentials.json").write_text(json.dumps({
"claudeAiOauth": {
"accessToken": _make_fake_jwt(),
"refreshToken": "fake-refresh",
"expiresAt": int(time.time() * 1000) + 3_600_000,
}
}))
# Patch Path.home() so the adapter finds the file
monkeypatch.setattr(Path, "home", classmethod(lambda cls: tmp_path))
for var in [
"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY",
"ANTHROPIC_TOKEN", "CLAUDE_CODE_OAUTH_TOKEN",
"NOUS_API_KEY", "DEEPSEEK_API_KEY",
]:
monkeypatch.delenv(var, raising=False)
return hermes_home
def test_claude_code_file_detected_by_model_picker(claude_code_only_env):
"""anthropic should appear when credentials only exist in ~/.claude/.credentials.json."""
from hermes_cli.model_switch import list_authenticated_providers
providers = list_authenticated_providers(
current_provider="anthropic",
max_models=10,
)
slugs = [p["slug"] for p in providers]
assert "anthropic" in slugs, (
f"anthropic not found in /model picker providers: {slugs}"
)
anthropic = next(p for p in providers if p["slug"] == "anthropic")
assert anthropic["is_current"] is True
assert anthropic["total_models"] > 0
def test_no_codex_when_no_credentials(tmp_path, monkeypatch):
"""openai-codex should NOT appear when no credentials exist anywhere."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("CODEX_HOME", str(tmp_path / "no_codex"))
(hermes_home / "auth.json").write_text(
json.dumps({"version": 2, "providers": {}})
)
for var in [
"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY",
"NOUS_API_KEY", "DEEPSEEK_API_KEY", "COPILOT_GITHUB_TOKEN",
"GH_TOKEN", "GEMINI_API_KEY",
]:
monkeypatch.delenv(var, raising=False)
from hermes_cli.model_switch import list_authenticated_providers
providers = list_authenticated_providers(
current_provider="openrouter",
max_models=10,
)
slugs = [p["slug"] for p in providers]
assert "openai-codex" not in slugs, (
"openai-codex should not appear without any credentials"
)

View file

@ -68,6 +68,7 @@ class TestLoadConfigDefaults:
assert "max_turns" not in config
assert "terminal" in config
assert config["terminal"]["backend"] == "local"
assert config["display"]["interim_assistant_messages"] is True
def test_legacy_root_level_max_turns_migrates_to_agent_config(self, tmp_path):
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
@ -421,3 +422,25 @@ class TestAnthropicTokenMigration:
}):
migrate_config(interactive=False, quiet=True)
assert load_env().get("ANTHROPIC_TOKEN") == "current-token"
class TestInterimAssistantMessageConfig:
"""Test the explicit gateway interim-message config gate."""
def test_default_config_enables_interim_assistant_messages(self):
assert DEFAULT_CONFIG["display"]["interim_assistant_messages"] is True
def test_migrate_to_v15_adds_interim_assistant_message_gate(self, tmp_path):
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump({"_config_version": 14, "display": {"tool_progress": "off"}}),
encoding="utf-8",
)
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
migrate_config(interactive=False, quiet=True)
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert raw["_config_version"] == 16
assert raw["display"]["tool_progress"] == "off"
assert raw["display"]["interim_assistant_messages"] is True

View file

@ -0,0 +1,342 @@
"""Tests for container-aware CLI routing (NixOS container mode).
When container.enable = true in the NixOS module, the activation script
writes a .container-mode metadata file. The host CLI detects this and
execs into the container instead of running locally.
"""
import os
import subprocess
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from hermes_cli.config import (
_is_inside_container,
get_container_exec_info,
)
# =============================================================================
# _is_inside_container
# =============================================================================
def test_is_inside_container_dockerenv():
"""Detects /.dockerenv marker file."""
with patch("os.path.exists") as mock_exists:
mock_exists.side_effect = lambda p: p == "/.dockerenv"
assert _is_inside_container() is True
def test_is_inside_container_containerenv():
"""Detects Podman's /run/.containerenv marker."""
with patch("os.path.exists") as mock_exists:
mock_exists.side_effect = lambda p: p == "/run/.containerenv"
assert _is_inside_container() is True
def test_is_inside_container_cgroup_docker():
"""Detects 'docker' in /proc/1/cgroup."""
with patch("os.path.exists", return_value=False), \
patch("builtins.open", create=True) as mock_open:
mock_open.return_value.__enter__ = lambda s: s
mock_open.return_value.__exit__ = MagicMock(return_value=False)
mock_open.return_value.read = MagicMock(
return_value="12:memory:/docker/abc123\n"
)
assert _is_inside_container() is True
def test_is_inside_container_false_on_host():
"""Returns False when none of the container indicators are present."""
with patch("os.path.exists", return_value=False), \
patch("builtins.open", side_effect=OSError("no such file")):
assert _is_inside_container() is False
# =============================================================================
# get_container_exec_info
# =============================================================================
@pytest.fixture
def container_env(tmp_path, monkeypatch):
"""Set up a fake HERMES_HOME with .container-mode file."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.delenv("HERMES_DEV", raising=False)
container_mode = hermes_home / ".container-mode"
container_mode.write_text(
"# Written by NixOS activation script. Do not edit manually.\n"
"backend=podman\n"
"container_name=hermes-agent\n"
"exec_user=hermes\n"
"hermes_bin=/data/current-package/bin/hermes\n"
)
return hermes_home
def test_get_container_exec_info_returns_metadata(container_env):
"""Reads .container-mode and returns all fields including exec_user."""
with patch("hermes_cli.config._is_inside_container", return_value=False):
info = get_container_exec_info()
assert info is not None
assert info["backend"] == "podman"
assert info["container_name"] == "hermes-agent"
assert info["exec_user"] == "hermes"
assert info["hermes_bin"] == "/data/current-package/bin/hermes"
def test_get_container_exec_info_none_inside_container(container_env):
"""Returns None when we're already inside a container."""
with patch("hermes_cli.config._is_inside_container", return_value=True):
info = get_container_exec_info()
assert info is None
def test_get_container_exec_info_none_without_file(tmp_path, monkeypatch):
"""Returns None when .container-mode doesn't exist (native mode)."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.delenv("HERMES_DEV", raising=False)
with patch("hermes_cli.config._is_inside_container", return_value=False):
info = get_container_exec_info()
assert info is None
def test_get_container_exec_info_skipped_when_hermes_dev(container_env, monkeypatch):
"""Returns None when HERMES_DEV=1 is set (dev mode bypass)."""
monkeypatch.setenv("HERMES_DEV", "1")
with patch("hermes_cli.config._is_inside_container", return_value=False):
info = get_container_exec_info()
assert info is None
def test_get_container_exec_info_not_skipped_when_hermes_dev_zero(container_env, monkeypatch):
"""HERMES_DEV=0 does NOT trigger bypass — only '1' does."""
monkeypatch.setenv("HERMES_DEV", "0")
with patch("hermes_cli.config._is_inside_container", return_value=False):
info = get_container_exec_info()
assert info is not None
def test_get_container_exec_info_defaults():
"""Falls back to defaults for missing keys."""
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
hermes_home = Path(tmpdir) / ".hermes"
hermes_home.mkdir()
(hermes_home / ".container-mode").write_text(
"# minimal file with no keys\n"
)
with patch("hermes_cli.config._is_inside_container", return_value=False), \
patch("hermes_cli.config.get_hermes_home", return_value=hermes_home), \
patch.dict(os.environ, {}, clear=False):
os.environ.pop("HERMES_DEV", None)
info = get_container_exec_info()
assert info is not None
assert info["backend"] == "docker"
assert info["container_name"] == "hermes-agent"
assert info["exec_user"] == "hermes"
assert info["hermes_bin"] == "/data/current-package/bin/hermes"
def test_get_container_exec_info_docker_backend(container_env):
"""Correctly reads docker backend with custom exec_user."""
(container_env / ".container-mode").write_text(
"backend=docker\n"
"container_name=hermes-custom\n"
"exec_user=myuser\n"
"hermes_bin=/opt/hermes/bin/hermes\n"
)
with patch("hermes_cli.config._is_inside_container", return_value=False):
info = get_container_exec_info()
assert info["backend"] == "docker"
assert info["container_name"] == "hermes-custom"
assert info["exec_user"] == "myuser"
assert info["hermes_bin"] == "/opt/hermes/bin/hermes"
def test_get_container_exec_info_crashes_on_permission_error(container_env):
"""PermissionError propagates instead of being silently swallowed."""
with patch("hermes_cli.config._is_inside_container", return_value=False), \
patch("builtins.open", side_effect=PermissionError("permission denied")):
with pytest.raises(PermissionError):
get_container_exec_info()
# =============================================================================
# _exec_in_container
# =============================================================================
@pytest.fixture
def docker_container_info():
return {
"backend": "docker",
"container_name": "hermes-agent",
"exec_user": "hermes",
"hermes_bin": "/data/current-package/bin/hermes",
}
@pytest.fixture
def podman_container_info():
return {
"backend": "podman",
"container_name": "hermes-agent",
"exec_user": "hermes",
"hermes_bin": "/data/current-package/bin/hermes",
}
def test_exec_in_container_calls_execvp(docker_container_info):
"""Verifies os.execvp is called with correct args: runtime, tty flags,
user, env vars, container name, binary, and CLI args."""
from hermes_cli.main import _exec_in_container
with patch("shutil.which", return_value="/usr/bin/docker"), \
patch("subprocess.run") as mock_run, \
patch("sys.stdin") as mock_stdin, \
patch("os.execvp") as mock_execvp, \
patch.dict(os.environ, {"TERM": "xterm-256color", "LANG": "en_US.UTF-8"},
clear=False):
mock_stdin.isatty.return_value = True
mock_run.return_value = MagicMock(returncode=0)
_exec_in_container(docker_container_info, ["chat", "-m", "opus"])
mock_execvp.assert_called_once()
cmd = mock_execvp.call_args[0][1]
assert cmd[0] == "/usr/bin/docker"
assert cmd[1] == "exec"
assert "-it" in cmd
idx_u = cmd.index("-u")
assert cmd[idx_u + 1] == "hermes"
e_indices = [i for i, v in enumerate(cmd) if v == "-e"]
e_values = [cmd[i + 1] for i in e_indices]
assert "TERM=xterm-256color" in e_values
assert "LANG=en_US.UTF-8" in e_values
assert "hermes-agent" in cmd
assert "/data/current-package/bin/hermes" in cmd
assert "chat" in cmd
def test_exec_in_container_non_tty_uses_i_only(docker_container_info):
"""Non-TTY mode uses -i instead of -it."""
from hermes_cli.main import _exec_in_container
with patch("shutil.which", return_value="/usr/bin/docker"), \
patch("subprocess.run") as mock_run, \
patch("sys.stdin") as mock_stdin, \
patch("os.execvp") as mock_execvp:
mock_stdin.isatty.return_value = False
mock_run.return_value = MagicMock(returncode=0)
_exec_in_container(docker_container_info, ["sessions", "list"])
cmd = mock_execvp.call_args[0][1]
assert "-i" in cmd
assert "-it" not in cmd
def test_exec_in_container_no_runtime_hard_fails(podman_container_info):
"""Hard fails when runtime not found (no fallback)."""
from hermes_cli.main import _exec_in_container
with patch("shutil.which", return_value=None), \
patch("subprocess.run") as mock_run, \
patch("os.execvp") as mock_execvp, \
pytest.raises(SystemExit) as exc_info:
_exec_in_container(podman_container_info, ["chat"])
mock_run.assert_not_called()
mock_execvp.assert_not_called()
assert exc_info.value.code != 0
def test_exec_in_container_sudo_probe_sets_prefix(podman_container_info):
"""When first probe fails and sudo probe succeeds, execvp is called
with sudo -n prefix."""
from hermes_cli.main import _exec_in_container
def which_side_effect(name):
if name == "podman":
return "/usr/bin/podman"
if name == "sudo":
return "/usr/bin/sudo"
return None
with patch("shutil.which", side_effect=which_side_effect), \
patch("subprocess.run") as mock_run, \
patch("sys.stdin") as mock_stdin, \
patch("os.execvp") as mock_execvp:
mock_stdin.isatty.return_value = True
mock_run.side_effect = [
MagicMock(returncode=1), # direct probe fails
MagicMock(returncode=0), # sudo probe succeeds
]
_exec_in_container(podman_container_info, ["chat"])
mock_execvp.assert_called_once()
cmd = mock_execvp.call_args[0][1]
assert cmd[0] == "/usr/bin/sudo"
assert cmd[1] == "-n"
assert cmd[2] == "/usr/bin/podman"
assert cmd[3] == "exec"
def test_exec_in_container_probe_timeout_prints_message(docker_container_info):
"""TimeoutExpired from probe produces a human-readable error, not a
raw traceback."""
from hermes_cli.main import _exec_in_container
with patch("shutil.which", return_value="/usr/bin/docker"), \
patch("subprocess.run", side_effect=subprocess.TimeoutExpired(
cmd=["docker", "inspect"], timeout=15)), \
patch("os.execvp") as mock_execvp, \
pytest.raises(SystemExit) as exc_info:
_exec_in_container(docker_container_info, ["chat"])
mock_execvp.assert_not_called()
assert exc_info.value.code == 1
def test_exec_in_container_container_not_running_no_sudo(docker_container_info):
"""When runtime exists but container not found and no sudo available,
prints helpful error about root containers."""
from hermes_cli.main import _exec_in_container
def which_side_effect(name):
if name == "docker":
return "/usr/bin/docker"
return None
with patch("shutil.which", side_effect=which_side_effect), \
patch("subprocess.run") as mock_run, \
patch("os.execvp") as mock_execvp, \
pytest.raises(SystemExit) as exc_info:
mock_run.return_value = MagicMock(returncode=1)
_exec_in_container(docker_container_info, ["chat"])
mock_execvp.assert_not_called()
assert exc_info.value.code == 1

View file

@ -122,3 +122,54 @@ class TestCustomProviderModelSwitch:
model = config.get("model")
assert isinstance(model, dict)
assert model["default"] == "model-X"
def test_api_mode_set_from_provider_info(self, config_home):
"""When custom_providers entry has api_mode, it should be applied."""
import yaml
from hermes_cli.main import _model_flow_named_custom
provider_info = {
"name": "Anthropic Proxy",
"base_url": "https://proxy.example.com/anthropic",
"api_key": "***",
"model": "claude-3",
"api_mode": "anthropic_messages",
}
with patch("hermes_cli.models.fetch_api_models", return_value=["claude-3"]), \
patch.dict("sys.modules", {"simple_term_menu": None}), \
patch("builtins.input", return_value="1"), \
patch("builtins.print"):
_model_flow_named_custom({}, provider_info)
config = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = config.get("model")
assert isinstance(model, dict)
assert model.get("api_mode") == "anthropic_messages"
def test_api_mode_cleared_when_not_specified(self, config_home):
"""When custom_providers entry has no api_mode, stale api_mode is removed."""
import yaml
from hermes_cli.main import _model_flow_named_custom
# Pre-seed a stale api_mode in config
config_path = config_home / "config.yaml"
config_path.write_text(yaml.dump({"model": {"api_mode": "anthropic_messages"}}))
provider_info = {
"name": "My vLLM",
"base_url": "https://vllm.example.com/v1",
"api_key": "***",
"model": "llama-3",
}
with patch("hermes_cli.models.fetch_api_models", return_value=["llama-3"]), \
patch.dict("sys.modules", {"simple_term_menu": None}), \
patch("builtins.input", return_value="1"), \
patch("builtins.print"):
_model_flow_named_custom({}, provider_info)
config = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = config.get("model")
assert isinstance(model, dict)
assert "api_mode" not in model, "Stale api_mode should be removed"

View file

@ -1,288 +1,255 @@
"""Tests for hermes_cli/logs.py — log viewing and filtering."""
"""Tests for hermes_cli.logs — log viewing and filtering."""
import os
import textwrap
from datetime import datetime, timedelta
from io import StringIO
from pathlib import Path
from unittest.mock import patch
import pytest
from hermes_cli.logs import (
LOG_FILES,
_extract_level,
_extract_logger_name,
_line_matches_component,
_matches_filters,
_parse_line_timestamp,
_parse_since,
_read_last_n_lines,
list_logs,
tail_log,
_read_tail,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def log_dir(tmp_path, monkeypatch):
"""Create a fake HERMES_HOME with a logs/ directory."""
home = Path(os.environ["HERMES_HOME"])
logs = home / "logs"
logs.mkdir(parents=True, exist_ok=True)
return logs
@pytest.fixture
def sample_agent_log(log_dir):
"""Write a realistic agent.log with mixed levels and sessions."""
lines = textwrap.dedent("""\
2026-04-05 10:00:00,000 INFO run_agent: conversation turn: session=sess_aaa model=claude provider=openrouter platform=cli history=0 msg='hello'
2026-04-05 10:00:01,000 INFO run_agent: tool terminal completed (0.50s, 200 chars)
2026-04-05 10:00:02,000 INFO run_agent: API call #1: model=claude provider=openrouter in=1000 out=200 total=1200 latency=1.5s
2026-04-05 10:00:03,000 WARNING run_agent: Tool web_search returned error (2.00s): timeout
2026-04-05 10:00:04,000 INFO run_agent: conversation turn: session=sess_bbb model=gpt-5 provider=openai platform=telegram history=5 msg='fix bug'
2026-04-05 10:00:05,000 ERROR run_agent: API call failed after 3 retries. rate limited
2026-04-05 10:00:06,000 INFO run_agent: tool read_file completed (0.01s, 500 chars)
2026-04-05 10:00:07,000 DEBUG run_agent: verbose internal detail
2026-04-05 10:00:08,000 INFO credential_pool: credential pool: marking key-1 exhausted (status=429), rotating
2026-04-05 10:00:09,000 INFO credential_pool: credential pool: rotated to key-2
""")
path = log_dir / "agent.log"
path.write_text(lines)
return path
@pytest.fixture
def sample_errors_log(log_dir):
"""Write a small errors.log."""
lines = textwrap.dedent("""\
2026-04-05 10:00:03,000 WARNING run_agent: Tool web_search returned error (2.00s): timeout
2026-04-05 10:00:05,000 ERROR run_agent: API call failed after 3 retries. rate limited
""")
path = log_dir / "errors.log"
path.write_text(lines)
return path
# ---------------------------------------------------------------------------
# _parse_since
# Timestamp parsing
# ---------------------------------------------------------------------------
class TestParseSince:
def test_hours(self):
cutoff = _parse_since("2h")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(7200, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 7200) < 2
def test_minutes(self):
cutoff = _parse_since("30m")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(1800, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 1800) < 2
def test_days(self):
cutoff = _parse_since("1d")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(86400, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 86400) < 2
def test_seconds(self):
cutoff = _parse_since("60s")
cutoff = _parse_since("120s")
assert cutoff is not None
assert (datetime.now() - cutoff).total_seconds() == pytest.approx(60, abs=5)
assert abs((datetime.now() - cutoff).total_seconds() - 120) < 2
def test_invalid_returns_none(self):
assert _parse_since("abc") is None
assert _parse_since("") is None
assert _parse_since("10x") is None
def test_whitespace_handling(self):
cutoff = _parse_since(" 1h ")
def test_whitespace_tolerance(self):
cutoff = _parse_since(" 5m ")
assert cutoff is not None
# ---------------------------------------------------------------------------
# _parse_line_timestamp
# ---------------------------------------------------------------------------
class TestParseLineTimestamp:
def test_standard_format(self):
ts = _parse_line_timestamp("2026-04-05 10:00:00,123 INFO something")
assert ts is not None
assert ts.year == 2026
assert ts.hour == 10
ts = _parse_line_timestamp("2026-04-11 10:23:45 INFO gateway.run: msg")
assert ts == datetime(2026, 4, 11, 10, 23, 45)
def test_no_timestamp(self):
assert _parse_line_timestamp("just some text") is None
assert _parse_line_timestamp("no timestamp here") is None
def test_continuation_line(self):
assert _parse_line_timestamp(" at module.function (line 42)") is None
# ---------------------------------------------------------------------------
# _extract_level
# ---------------------------------------------------------------------------
class TestExtractLevel:
def test_info(self):
assert _extract_level("2026-04-05 10:00:00 INFO run_agent: something") == "INFO"
assert _extract_level("2026-01-01 00:00:00 INFO gateway.run: msg") == "INFO"
def test_warning(self):
assert _extract_level("2026-04-05 10:00:00 WARNING run_agent: bad") == "WARNING"
assert _extract_level("2026-01-01 00:00:00 WARNING tools.file: msg") == "WARNING"
def test_error(self):
assert _extract_level("2026-04-05 10:00:00 ERROR run_agent: crash") == "ERROR"
assert _extract_level("2026-01-01 00:00:00 ERROR run_agent: msg") == "ERROR"
def test_debug(self):
assert _extract_level("2026-04-05 10:00:00 DEBUG run_agent: detail") == "DEBUG"
assert _extract_level("2026-01-01 00:00:00 DEBUG agent.aux: msg") == "DEBUG"
def test_no_level(self):
assert _extract_level("just a plain line") is None
assert _extract_level("random text") is None
# ---------------------------------------------------------------------------
# _matches_filters
# Logger name extraction (new for component filtering)
# ---------------------------------------------------------------------------
class TestExtractLoggerName:
def test_standard_line(self):
line = "2026-04-11 10:23:45 INFO gateway.run: Starting gateway"
assert _extract_logger_name(line) == "gateway.run"
def test_nested_logger(self):
line = "2026-04-11 10:23:45 INFO gateway.platforms.telegram: connected"
assert _extract_logger_name(line) == "gateway.platforms.telegram"
def test_warning_level(self):
line = "2026-04-11 10:23:45 WARNING tools.terminal_tool: timeout"
assert _extract_logger_name(line) == "tools.terminal_tool"
def test_with_session_tag(self):
line = "2026-04-11 10:23:45 INFO [abc123] tools.file_tools: reading file"
assert _extract_logger_name(line) == "tools.file_tools"
def test_with_session_tag_and_error(self):
line = "2026-04-11 10:23:45 ERROR [sess_xyz] agent.context_compressor: failed"
assert _extract_logger_name(line) == "agent.context_compressor"
def test_top_level_module(self):
line = "2026-04-11 10:23:45 INFO run_agent: starting conversation"
assert _extract_logger_name(line) == "run_agent"
def test_no_match(self):
assert _extract_logger_name("random text") is None
class TestLineMatchesComponent:
def test_gateway_component(self):
line = "2026-04-11 10:23:45 INFO gateway.run: msg"
assert _line_matches_component(line, ("gateway",))
def test_gateway_nested(self):
line = "2026-04-11 10:23:45 INFO gateway.platforms.telegram: msg"
assert _line_matches_component(line, ("gateway",))
def test_tools_component(self):
line = "2026-04-11 10:23:45 INFO tools.terminal_tool: msg"
assert _line_matches_component(line, ("tools",))
def test_agent_with_multiple_prefixes(self):
prefixes = ("agent", "run_agent", "model_tools")
assert _line_matches_component(
"2026-04-11 10:23:45 INFO agent.context_compressor: msg", prefixes)
assert _line_matches_component(
"2026-04-11 10:23:45 INFO run_agent: msg", prefixes)
assert _line_matches_component(
"2026-04-11 10:23:45 INFO model_tools: msg", prefixes)
def test_no_match(self):
line = "2026-04-11 10:23:45 INFO tools.browser: msg"
assert not _line_matches_component(line, ("gateway",))
def test_with_session_tag(self):
line = "2026-04-11 10:23:45 INFO [abc] gateway.run: msg"
assert _line_matches_component(line, ("gateway",))
def test_unparseable_line(self):
assert not _line_matches_component("random text", ("gateway",))
# ---------------------------------------------------------------------------
# Combined filter
# ---------------------------------------------------------------------------
class TestMatchesFilters:
def test_no_filters_always_matches(self):
assert _matches_filters("any line") is True
def test_no_filters_passes_everything(self):
assert _matches_filters("any line")
def test_level_filter_passes(self):
def test_level_filter(self):
assert _matches_filters(
"2026-04-05 10:00:00 WARNING something",
min_level="WARNING",
) is True
"2026-01-01 00:00:00 WARNING x: msg", min_level="WARNING")
assert not _matches_filters(
"2026-01-01 00:00:00 INFO x: msg", min_level="WARNING")
def test_level_filter_rejects(self):
def test_session_filter(self):
assert _matches_filters(
"2026-04-05 10:00:00 INFO something",
min_level="WARNING",
) is False
"2026-01-01 00:00:00 INFO [abc123] x: msg", session_filter="abc123")
assert not _matches_filters(
"2026-01-01 00:00:00 INFO [xyz789] x: msg", session_filter="abc123")
def test_session_filter_passes(self):
def test_component_filter(self):
assert _matches_filters(
"session=sess_aaa model=claude",
session_filter="sess_aaa",
) is True
def test_session_filter_rejects(self):
assert _matches_filters(
"session=sess_aaa model=claude",
session_filter="sess_bbb",
) is False
def test_since_filter_passes(self):
# Line from the future should always pass
assert _matches_filters(
"2099-01-01 00:00:00 INFO future",
since=datetime.now(),
) is True
def test_since_filter_rejects(self):
assert _matches_filters(
"2020-01-01 00:00:00 INFO past",
since=datetime.now(),
) is False
"2026-01-01 00:00:00 INFO gateway.run: msg",
component_prefixes=("gateway",))
assert not _matches_filters(
"2026-01-01 00:00:00 INFO tools.file: msg",
component_prefixes=("gateway",))
def test_combined_filters(self):
line = "2099-01-01 00:00:00 WARNING run_agent: session=abc error"
"""All filters must pass for a line to match."""
line = "2026-04-11 10:00:00 WARNING [sess_1] gateway.run: connection lost"
assert _matches_filters(
line, min_level="WARNING", session_filter="abc",
since=datetime.now(),
) is True
# Fails session filter
line,
min_level="WARNING",
session_filter="sess_1",
component_prefixes=("gateway",),
)
# Fails component filter
assert not _matches_filters(
line,
min_level="WARNING",
session_filter="sess_1",
component_prefixes=("tools",),
)
def test_since_filter(self):
# Line with a very old timestamp should be filtered out
assert not _matches_filters(
"2020-01-01 00:00:00 INFO x: old msg",
since=datetime.now() - timedelta(hours=1))
# Line with a recent timestamp should pass
recent = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
assert _matches_filters(
line, min_level="WARNING", session_filter="xyz",
) is False
f"{recent} INFO x: recent msg",
since=datetime.now() - timedelta(hours=1))
# ---------------------------------------------------------------------------
# _read_last_n_lines
# File reading
# ---------------------------------------------------------------------------
class TestReadLastNLines:
def test_reads_correct_count(self, sample_agent_log):
lines = _read_last_n_lines(sample_agent_log, 3)
assert len(lines) == 3
class TestReadTail:
def test_read_small_file(self, tmp_path):
log_file = tmp_path / "test.log"
lines = [f"2026-01-01 00:00:0{i} INFO x: line {i}\n" for i in range(10)]
log_file.write_text("".join(lines))
def test_reads_all_when_fewer(self, sample_agent_log):
lines = _read_last_n_lines(sample_agent_log, 100)
assert len(lines) == 10 # sample has 10 lines
result = _read_last_n_lines(log_file, 5)
assert len(result) == 5
assert "line 9" in result[-1]
def test_empty_file(self, log_dir):
empty = log_dir / "empty.log"
empty.write_text("")
lines = _read_last_n_lines(empty, 10)
assert lines == []
def test_read_with_component_filter(self, tmp_path):
log_file = tmp_path / "test.log"
lines = [
"2026-01-01 00:00:00 INFO gateway.run: gw msg\n",
"2026-01-01 00:00:01 INFO tools.file: tool msg\n",
"2026-01-01 00:00:02 INFO gateway.session: session msg\n",
"2026-01-01 00:00:03 INFO agent.compressor: agent msg\n",
]
log_file.write_text("".join(lines))
def test_last_line_content(self, sample_agent_log):
lines = _read_last_n_lines(sample_agent_log, 1)
assert "rotated to key-2" in lines[0]
result = _read_tail(
log_file, 50,
has_filters=True,
component_prefixes=("gateway",),
)
assert len(result) == 2
assert "gw msg" in result[0]
assert "session msg" in result[1]
def test_empty_file(self, tmp_path):
log_file = tmp_path / "empty.log"
log_file.write_text("")
result = _read_last_n_lines(log_file, 10)
assert result == []
# ---------------------------------------------------------------------------
# tail_log
# LOG_FILES registry
# ---------------------------------------------------------------------------
class TestTailLog:
def test_basic_tail(self, sample_agent_log, capsys):
tail_log("agent", num_lines=3)
captured = capsys.readouterr()
assert "agent.log" in captured.out
# Should have the header + 3 lines
lines = captured.out.strip().split("\n")
assert len(lines) == 4 # 1 header + 3 content
def test_level_filter(self, sample_agent_log, capsys):
tail_log("agent", num_lines=50, level="ERROR")
captured = capsys.readouterr()
assert "level>=ERROR" in captured.out
# Only the ERROR line should appear
content_lines = [l for l in captured.out.strip().split("\n") if not l.startswith("---")]
assert len(content_lines) == 1
assert "API call failed" in content_lines[0]
def test_session_filter(self, sample_agent_log, capsys):
tail_log("agent", num_lines=50, session="sess_bbb")
captured = capsys.readouterr()
content_lines = [l for l in captured.out.strip().split("\n") if not l.startswith("---")]
assert len(content_lines) == 1
assert "sess_bbb" in content_lines[0]
def test_errors_log(self, sample_errors_log, capsys):
tail_log("errors", num_lines=10)
captured = capsys.readouterr()
assert "errors.log" in captured.out
assert "WARNING" in captured.out or "ERROR" in captured.out
def test_unknown_log_exits(self):
with pytest.raises(SystemExit):
tail_log("nonexistent")
def test_missing_file_exits(self, log_dir):
with pytest.raises(SystemExit):
tail_log("agent") # agent.log doesn't exist in clean log_dir
# ---------------------------------------------------------------------------
# list_logs
# ---------------------------------------------------------------------------
class TestListLogs:
def test_lists_files(self, sample_agent_log, sample_errors_log, capsys):
list_logs()
captured = capsys.readouterr()
assert "agent.log" in captured.out
assert "errors.log" in captured.out
def test_empty_dir(self, log_dir, capsys):
list_logs()
captured = capsys.readouterr()
assert "no log files yet" in captured.out
def test_shows_sizes(self, sample_agent_log, capsys):
list_logs()
captured = capsys.readouterr()
# File is small, should show as bytes or KB
assert "B" in captured.out or "KB" in captured.out
class TestLogFiles:
def test_known_log_files(self):
assert "agent" in LOG_FILES
assert "errors" in LOG_FILES
assert "gateway" in LOG_FILES

View file

@ -46,6 +46,8 @@ def _make_args(**kwargs):
"command": None,
"args": None,
"auth": None,
"preset": None,
"env": None,
"mcp_action": None,
}
defaults.update(kwargs)
@ -269,6 +271,145 @@ class TestMcpAdd:
config = load_config()
assert config["mcp_servers"]["broken"]["enabled"] is False
def test_add_stdio_server_with_env(self, tmp_path, capsys, monkeypatch):
"""Stdio servers can persist explicit environment variables."""
fake_tools = [FakeTool("search", "Search repos")]
def mock_probe(name, config, **kw):
assert config["env"] == {
"MY_API_KEY": "secret123",
"DEBUG": "true",
}
return [(t.name, t.description) for t in fake_tools]
monkeypatch.setattr(
"hermes_cli.mcp_config._probe_single_server", mock_probe
)
monkeypatch.setattr("builtins.input", lambda _: "")
from hermes_cli.mcp_config import cmd_mcp_add
cmd_mcp_add(_make_args(
name="github",
command="npx",
args=["@mcp/github"],
env=["MY_API_KEY=secret123", "DEBUG=true"],
))
out = capsys.readouterr().out
assert "Saved" in out
from hermes_cli.config import load_config
config = load_config()
srv = config["mcp_servers"]["github"]
assert srv["env"] == {
"MY_API_KEY": "secret123",
"DEBUG": "true",
}
def test_add_stdio_server_rejects_invalid_env_name(self, capsys):
"""Invalid environment variable names are rejected up front."""
from hermes_cli.mcp_config import cmd_mcp_add
cmd_mcp_add(_make_args(
name="github",
command="npx",
args=["@mcp/github"],
env=["BAD-NAME=value"],
))
out = capsys.readouterr().out
assert "Invalid --env variable name" in out
def test_add_http_server_rejects_env_flag(self, capsys):
"""The --env flag is only valid for stdio transports."""
from hermes_cli.mcp_config import cmd_mcp_add
cmd_mcp_add(_make_args(
name="ink",
url="https://mcp.ml.ink/mcp",
env=["DEBUG=true"],
))
out = capsys.readouterr().out
assert "only supported for stdio MCP servers" in out
def test_add_preset_fills_transport(self, tmp_path, capsys, monkeypatch):
"""A preset fills in command/args when no explicit transport given."""
monkeypatch.setattr(
"hermes_cli.mcp_config._MCP_PRESETS",
{"testmcp": {"command": "npx", "args": ["-y", "test-mcp-server"], "display_name": "Test MCP"}},
)
fake_tools = [FakeTool("do_thing", "Does a thing")]
def mock_probe(name, config, **kw):
assert name == "myserver"
assert config["command"] == "npx"
assert config["args"] == ["-y", "test-mcp-server"]
assert "env" not in config
return [(t.name, t.description) for t in fake_tools]
monkeypatch.setattr(
"hermes_cli.mcp_config._probe_single_server", mock_probe
)
monkeypatch.setattr("builtins.input", lambda _: "")
from hermes_cli.mcp_config import cmd_mcp_add
from hermes_cli.config import read_raw_config
cmd_mcp_add(_make_args(name="myserver", preset="testmcp"))
out = capsys.readouterr().out
assert "Saved" in out
config = read_raw_config()
srv = config["mcp_servers"]["myserver"]
assert srv["command"] == "npx"
assert srv["args"] == ["-y", "test-mcp-server"]
assert "env" not in srv
def test_preset_does_not_override_explicit_command(self, tmp_path, capsys, monkeypatch):
"""Explicit transports win over presets."""
monkeypatch.setattr(
"hermes_cli.mcp_config._MCP_PRESETS",
{"testmcp": {"command": "npx", "args": ["-y", "test-mcp-server"], "display_name": "Test MCP"}},
)
fake_tools = [FakeTool("search", "Search repos")]
def mock_probe(name, config, **kw):
assert config["command"] == "uvx"
assert config["args"] == ["custom-server"]
assert "env" not in config
return [(t.name, t.description) for t in fake_tools]
monkeypatch.setattr(
"hermes_cli.mcp_config._probe_single_server", mock_probe
)
monkeypatch.setattr("builtins.input", lambda _: "")
from hermes_cli.mcp_config import cmd_mcp_add
from hermes_cli.config import read_raw_config
cmd_mcp_add(_make_args(
name="custom",
preset="testmcp",
command="uvx",
args=["custom-server"],
))
out = capsys.readouterr().out
assert "Saved" in out
config = read_raw_config()
srv = config["mcp_servers"]["custom"]
assert srv["command"] == "uvx"
assert srv["args"] == ["custom-server"]
assert "env" not in srv
def test_unknown_preset_rejected(self, capsys):
"""An unknown preset name is rejected with a clear error."""
from hermes_cli.mcp_config import cmd_mcp_add
cmd_mcp_add(_make_args(name="foo", preset="nonexistent"))
out = capsys.readouterr().out
assert "Unknown MCP preset" in out
# ---------------------------------------------------------------------------
# Tests: cmd_mcp_test

View file

@ -257,3 +257,76 @@ class TestProviderPersistsAfterModelSave:
assert model.get("provider") == "opencode-go"
assert model.get("default") == "minimax-m2.5"
assert model.get("api_mode") == "anthropic_messages"
class TestBaseUrlValidation:
"""Reject non-URL values in the base URL prompt (e.g. shell commands)."""
def test_invalid_base_url_rejected(self, config_home, monkeypatch, capsys):
"""Typing a non-URL string should not be saved as the base URL."""
from hermes_cli.auth import PROVIDER_REGISTRY
pconfig = PROVIDER_REGISTRY.get("zai")
if not pconfig:
pytest.skip("zai not in PROVIDER_REGISTRY")
monkeypatch.setenv("GLM_API_KEY", "test-key")
from hermes_cli.main import _model_flow_api_key_provider
from hermes_cli.config import load_config, get_env_value
# User types a shell command instead of a URL at the base URL prompt
with patch("hermes_cli.auth._prompt_model_selection", return_value="glm-5"), \
patch("hermes_cli.auth.deactivate_provider"), \
patch("builtins.input", return_value="nano ~/.hermes/.env"):
_model_flow_api_key_provider(load_config(), "zai", "old-model")
# The garbage value should NOT have been saved
saved = get_env_value("GLM_BASE_URL") or ""
assert not saved or saved.startswith(("http://", "https://")), \
f"Non-URL value was saved as GLM_BASE_URL: {saved}"
captured = capsys.readouterr()
assert "Invalid URL" in captured.out
def test_valid_base_url_accepted(self, config_home, monkeypatch):
"""A proper URL should be saved normally."""
from hermes_cli.auth import PROVIDER_REGISTRY
pconfig = PROVIDER_REGISTRY.get("zai")
if not pconfig:
pytest.skip("zai not in PROVIDER_REGISTRY")
monkeypatch.setenv("GLM_API_KEY", "test-key")
from hermes_cli.main import _model_flow_api_key_provider
from hermes_cli.config import load_config, get_env_value
with patch("hermes_cli.auth._prompt_model_selection", return_value="glm-5"), \
patch("hermes_cli.auth.deactivate_provider"), \
patch("builtins.input", return_value="https://custom.z.ai/api/paas/v4"):
_model_flow_api_key_provider(load_config(), "zai", "old-model")
saved = get_env_value("GLM_BASE_URL") or ""
assert saved == "https://custom.z.ai/api/paas/v4"
def test_empty_base_url_keeps_default(self, config_home, monkeypatch):
"""Pressing Enter (empty) should not change the base URL."""
from hermes_cli.auth import PROVIDER_REGISTRY
pconfig = PROVIDER_REGISTRY.get("zai")
if not pconfig:
pytest.skip("zai not in PROVIDER_REGISTRY")
monkeypatch.setenv("GLM_API_KEY", "test-key")
monkeypatch.delenv("GLM_BASE_URL", raising=False)
from hermes_cli.main import _model_flow_api_key_provider
from hermes_cli.config import load_config, get_env_value
with patch("hermes_cli.auth._prompt_model_selection", return_value="glm-5"), \
patch("hermes_cli.auth.deactivate_provider"), \
patch("builtins.input", return_value=""):
_model_flow_api_key_provider(load_config(), "zai", "old-model")
saved = get_env_value("GLM_BASE_URL") or ""
assert saved == "", "Empty input should not save a base URL"

View file

@ -1,8 +1,10 @@
from io import StringIO
from unittest.mock import patch
import pytest
from rich.console import Console
from cli import ChatConsole
from hermes_cli.skills_hub import do_check, do_install, do_list, do_update, handle_skills_slash
@ -179,6 +181,21 @@ def test_do_update_reinstalls_outdated_skills(monkeypatch):
assert "Updated 1 skill" in output
def test_handle_skills_slash_search_accepts_chatconsole_without_status_errors():
results = [type("R", (), {
"name": "kubernetes",
"description": "Cluster orchestration",
"source": "skills.sh",
"trust_level": "community",
"identifier": "skills-sh/example/kubernetes",
})()]
with patch("tools.skills_hub.unified_search", return_value=results), \
patch("tools.skills_hub.create_source_router", return_value={}), \
patch("tools.skills_hub.GitHubAuth"):
handle_skills_slash("/skills search kubernetes", console=ChatConsole())
def test_do_install_scans_with_resolved_identifier(monkeypatch, tmp_path, hub_env):
import tools.skills_guard as guard
import tools.skills_hub as hub

View file

@ -0,0 +1,77 @@
"""Tests for hermes_cli/tips.py — random tip display at session start."""
import pytest
from hermes_cli.tips import TIPS, get_random_tip, get_tip_count
class TestTipsCorpus:
"""Validate the tip corpus itself."""
def test_has_at_least_200_tips(self):
assert len(TIPS) >= 200, f"Expected 200+ tips, got {len(TIPS)}"
def test_no_duplicates(self):
assert len(TIPS) == len(set(TIPS)), "Duplicate tips found"
def test_all_tips_are_strings(self):
for i, tip in enumerate(TIPS):
assert isinstance(tip, str), f"Tip {i} is not a string: {type(tip)}"
def test_no_empty_tips(self):
for i, tip in enumerate(TIPS):
assert tip.strip(), f"Tip {i} is empty or whitespace-only"
def test_max_length_reasonable(self):
"""Tips should fit on a single terminal line (~120 chars max)."""
for i, tip in enumerate(TIPS):
assert len(tip) <= 150, (
f"Tip {i} too long ({len(tip)} chars): {tip[:60]}..."
)
def test_no_leading_trailing_whitespace(self):
for i, tip in enumerate(TIPS):
assert tip == tip.strip(), f"Tip {i} has leading/trailing whitespace"
class TestGetRandomTip:
"""Validate the get_random_tip() function."""
def test_returns_string(self):
tip = get_random_tip()
assert isinstance(tip, str)
assert len(tip) > 0
def test_returns_tip_from_corpus(self):
tip = get_random_tip()
assert tip in TIPS
def test_randomness(self):
"""Multiple calls should eventually return different tips."""
seen = set()
for _ in range(50):
seen.add(get_random_tip())
# With 200+ tips and 50 draws, we should see at least 10 unique
assert len(seen) >= 10, f"Only got {len(seen)} unique tips in 50 draws"
class TestGetTipCount:
def test_matches_corpus_length(self):
assert get_tip_count() == len(TIPS)
class TestTipIntegrationInCLI:
"""Test that the tip display code in cli.py works correctly."""
def test_tip_import_works(self):
"""The import used in cli.py must succeed."""
from hermes_cli.tips import get_random_tip
assert callable(get_random_tip)
def test_tip_display_format(self):
"""Verify the Rich markup format doesn't break."""
tip = get_random_tip()
color = "#B8860B"
markup = f"[dim {color}]✦ Tip: {tip}[/]"
# Should not contain nested/broken Rich tags
assert markup.count("[/]") == 1
assert "[dim #B8860B]" in markup

View file

@ -798,3 +798,120 @@ class TestFindGatewayPidsExclude:
pids = gateway_cli.find_gateway_pids()
assert pids == [100]
# ---------------------------------------------------------------------------
# Gateway mode writes exit code before restart (#8300)
# ---------------------------------------------------------------------------
class TestGatewayModeWritesExitCodeEarly:
"""When running as ``hermes update --gateway``, the exit code marker must be
written *before* the gateway restart attempt. Without this, systemd's
``KillMode=mixed`` kills the update process (and its wrapping shell) during
the cgroup teardown, so the shell epilogue that normally writes the exit
code never executes. The new gateway's update watcher then polls for 30
minutes and sends a spurious timeout message.
"""
@patch("shutil.which", return_value=None)
@patch("subprocess.run")
def test_exit_code_written_in_gateway_mode(
self, mock_run, _mock_which, capsys, tmp_path, monkeypatch,
):
monkeypatch.setattr(gateway_cli, "is_macos", lambda: False)
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: False)
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
# Point HERMES_HOME at a temp dir so the marker file lands there
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import hermes_cli.config as _cfg
monkeypatch.setattr(_cfg, "get_hermes_home", lambda: hermes_home)
# Also patch the module-level ref used by cmd_update
import hermes_cli.main as _main_mod
monkeypatch.setattr(_main_mod, "get_hermes_home", lambda: hermes_home)
mock_run.side_effect = _make_run_side_effect(commit_count="1")
args = SimpleNamespace(gateway=True)
with patch.object(gateway_cli, "find_gateway_pids", return_value=[]):
cmd_update(args)
exit_code_path = hermes_home / ".update_exit_code"
assert exit_code_path.exists(), ".update_exit_code not written in gateway mode"
assert exit_code_path.read_text() == "0"
@patch("shutil.which", return_value=None)
@patch("subprocess.run")
def test_exit_code_not_written_in_normal_mode(
self, mock_run, _mock_which, capsys, tmp_path, monkeypatch,
):
"""Non-gateway mode should NOT write the exit code (the shell does it)."""
monkeypatch.setattr(gateway_cli, "is_macos", lambda: False)
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: False)
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import hermes_cli.config as _cfg
monkeypatch.setattr(_cfg, "get_hermes_home", lambda: hermes_home)
import hermes_cli.main as _main_mod
monkeypatch.setattr(_main_mod, "get_hermes_home", lambda: hermes_home)
mock_run.side_effect = _make_run_side_effect(commit_count="1")
args = SimpleNamespace(gateway=False)
with patch.object(gateway_cli, "find_gateway_pids", return_value=[]):
cmd_update(args)
exit_code_path = hermes_home / ".update_exit_code"
assert not exit_code_path.exists(), ".update_exit_code should not be written outside gateway mode"
@patch("shutil.which", return_value=None)
@patch("subprocess.run")
def test_exit_code_written_before_restart_call(
self, mock_run, _mock_which, capsys, tmp_path, monkeypatch,
):
"""Exit code must exist BEFORE systemctl restart is called."""
monkeypatch.setattr(gateway_cli, "is_macos", lambda: False)
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True)
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import hermes_cli.config as _cfg
monkeypatch.setattr(_cfg, "get_hermes_home", lambda: hermes_home)
import hermes_cli.main as _main_mod
monkeypatch.setattr(_main_mod, "get_hermes_home", lambda: hermes_home)
exit_code_path = hermes_home / ".update_exit_code"
# Track whether exit code exists when systemctl restart is called
exit_code_existed_at_restart = []
original_side_effect = _make_run_side_effect(
commit_count="1", systemd_active=True,
)
def tracking_side_effect(cmd, **kwargs):
joined = " ".join(str(c) for c in cmd)
if "systemctl" in joined and "restart" in joined:
exit_code_existed_at_restart.append(exit_code_path.exists())
return original_side_effect(cmd, **kwargs)
mock_run.side_effect = tracking_side_effect
args = SimpleNamespace(gateway=True)
with patch.object(gateway_cli, "find_gateway_pids", return_value=[]):
cmd_update(args)
assert exit_code_existed_at_restart, "systemctl restart was never called"
assert exit_code_existed_at_restart[0] is True, \
".update_exit_code must exist BEFORE systemctl restart (cgroup kill race)"