mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
* perf(bitwarden): persist secret-fetch cache across CLI invocations
Every `hermes` invocation paid a ~380ms tax for `bws secret list` to
Bitwarden Secrets Manager because the existing cache was in-process only.
Back-to-back `hermes chat -q`, gateway-spawned agents, and cron-launched
runs all re-fetched.
Adds a disk-persisted L2 cache at `<hermes_home>/cache/bws_cache.json`
(mode 0600, never contains the access token — only the SHA-256
fingerprint prefix). Same TTL as the in-process cache. Read on miss,
write on bws success, ignored on key mismatch / corruption / expiry.
Measured on a startup profile:
load_hermes_dotenv() cold: 372ms → warm (disk cache hit): 20ms
End-to-end `hermes --version` cold→warm: 666ms → ~295ms.
In a hermes-vs-codex benchmark across 11 single- and multi-turn tasks
(framework overhead = wall − llm − tool_exec, median over 3 trials):
cohort before after saved
single-turn (median) 2.96s 2.31s -0.65s
multi-turn (5-turn) 9.40s 8.95s -0.45s (≈0.3s/turn)
Hermes now wins head-to-head on 6/11 tasks vs codex (was 4/11 before).
The remaining ~0.6s single-turn delta is mostly Python's own import
cost in hermes_cli.main, which is a separate optimization.
* perf(cli): lazy-load model catalog + dedupe config.yaml reads at startup
Two import-time wins on top of the bws disk-cache fix:
1. Lazy-load `hermes_cli.models._PROVIDER_MODELS` via PEP 562
module-level `__getattr__`. The catalog is ~55ms of work that was
eagerly imported on every CLI invocation (line 4557 `if not
_is_termux_startup_environment(): from hermes_cli.models import
_PROVIDER_MODELS`). Audit showed every internal call site already
does its own function-local import; only test code reads
`hermes_cli.main._PROVIDER_MODELS` as a module attribute, and
__getattr__ keeps that working transparently. First access triggers
the import once and caches the result on the module via
`globals()[name] = ...`, so subsequent reads are dict lookups.
2. Dedupe the double config.yaml read in the top-of-module bootstrap.
Previously: one raw yaml.safe_load for the `security.redact_secrets`
bridge, then a separate full `load_config()` (with deep-merge) for
`network.force_ipv4`. Both keys come from the same file. Merged
into one raw yaml load.
Combined with the bws cache fix in the previous commit:
hermes --version wall time:
original (cold): 666 ms
after bws fix (warm): 295 ms
after lazy-load + dedupe: 228 ms (-67 ms additional, -66% from original)
Tests:
- tests/hermes_cli/test_api_key_providers.py: 173/173 pass
(lazy __getattr__ correctly handles
`from hermes_cli.main import _PROVIDER_MODELS`)
- tests/test_ipv4_preference.py + tests/hermes_cli/test_redact_config_bridge.py +
tests/agent/test_redact.py: 93/93 pass (dedupe preserves both bridges)
- tests/test_bitwarden_secrets.py + env_loader tests: 49/49 pass
795 lines
26 KiB
Python
795 lines
26 KiB
Python
"""Hermetic tests for the Bitwarden Secrets Manager integration.
|
|
|
|
We never hit GitHub or Bitwarden in tests — subprocess + urllib are
|
|
mocked so the suite stays fast and offline-safe. The "live" pull and
|
|
binary download are exercised manually by `hermes secrets bitwarden
|
|
setup` outside of pytest.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import os
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import zipfile
|
|
from pathlib import Path
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
|
|
|
|
# Make the worktree importable without depending on the installed wheel.
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from agent.secret_sources import bitwarden as bw # noqa: E402
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_caches():
|
|
bw._reset_cache_for_tests()
|
|
yield
|
|
bw._reset_cache_for_tests()
|
|
|
|
|
|
@pytest.fixture
|
|
def hermes_home(tmp_path, monkeypatch):
|
|
"""Point Hermes at an isolated home directory."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
# Some modules cache get_hermes_home; clear if needed.
|
|
import hermes_constants
|
|
if hasattr(hermes_constants, "_HERMES_HOME_CACHE"):
|
|
hermes_constants._HERMES_HOME_CACHE = None # type: ignore[attr-defined]
|
|
return home
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _platform_asset_name
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"system,machine,libc_text,expected",
|
|
[
|
|
("Darwin", "x86_64", "",
|
|
f"bws-macos-universal-{bw._BWS_VERSION}.zip"),
|
|
("Darwin", "arm64", "",
|
|
f"bws-macos-universal-{bw._BWS_VERSION}.zip"),
|
|
("Linux", "x86_64", "glibc",
|
|
f"bws-x86_64-unknown-linux-gnu-{bw._BWS_VERSION}.zip"),
|
|
("Linux", "x86_64", "musl libc",
|
|
f"bws-x86_64-unknown-linux-musl-{bw._BWS_VERSION}.zip"),
|
|
("Linux", "aarch64", "",
|
|
f"bws-aarch64-unknown-linux-gnu-{bw._BWS_VERSION}.zip"),
|
|
("Windows", "AMD64", "",
|
|
f"bws-x86_64-pc-windows-msvc-{bw._BWS_VERSION}.zip"),
|
|
("Windows", "ARM64", "",
|
|
f"bws-aarch64-pc-windows-msvc-{bw._BWS_VERSION}.zip"),
|
|
],
|
|
)
|
|
def test_platform_asset_name(system, machine, libc_text, expected):
|
|
with mock.patch.object(bw.platform, "system", return_value=system), \
|
|
mock.patch.object(bw.platform, "machine", return_value=machine), \
|
|
mock.patch.object(
|
|
bw.subprocess,
|
|
"run",
|
|
return_value=mock.Mock(stdout=libc_text, stderr=libc_text),
|
|
):
|
|
assert bw._platform_asset_name() == expected
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# install_bws — fully mocked HTTP
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_fake_zip(binary_bytes: bytes) -> bytes:
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w") as zf:
|
|
zf.writestr("bws", binary_bytes)
|
|
return buf.getvalue()
|
|
|
|
|
|
def test_install_bws_happy_path(hermes_home, monkeypatch):
|
|
fake_binary = b"#!/bin/sh\necho 'bws fake 2.0.0'\n"
|
|
zip_bytes = _make_fake_zip(fake_binary)
|
|
asset_name = bw._platform_asset_name()
|
|
checksum_text = (
|
|
f"{hashlib.sha256(zip_bytes).hexdigest()} {asset_name}\n"
|
|
"ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff other-file\n"
|
|
)
|
|
|
|
def fake_download(url, dest):
|
|
if url.endswith(".zip"):
|
|
Path(dest).write_bytes(zip_bytes)
|
|
elif url.endswith(".txt"):
|
|
Path(dest).write_text(checksum_text)
|
|
else:
|
|
raise AssertionError(f"unexpected download url: {url}")
|
|
|
|
monkeypatch.setattr(bw, "_http_download", fake_download)
|
|
|
|
path = bw.install_bws()
|
|
assert path.exists()
|
|
assert path.read_bytes() == fake_binary
|
|
# Executable bit set
|
|
assert path.stat().st_mode & stat.S_IXUSR
|
|
|
|
|
|
def test_install_bws_checksum_mismatch(hermes_home, monkeypatch):
|
|
zip_bytes = _make_fake_zip(b"contents")
|
|
asset_name = bw._platform_asset_name()
|
|
wrong_checksum = "0" * 64
|
|
checksum_text = f"{wrong_checksum} {asset_name}\n"
|
|
|
|
def fake_download(url, dest):
|
|
if url.endswith(".zip"):
|
|
Path(dest).write_bytes(zip_bytes)
|
|
else:
|
|
Path(dest).write_text(checksum_text)
|
|
|
|
monkeypatch.setattr(bw, "_http_download", fake_download)
|
|
|
|
with pytest.raises(RuntimeError, match="Checksum mismatch"):
|
|
bw.install_bws()
|
|
|
|
|
|
def test_install_bws_missing_checksum_entry(hermes_home, monkeypatch):
|
|
zip_bytes = _make_fake_zip(b"x")
|
|
|
|
def fake_download(url, dest):
|
|
if url.endswith(".zip"):
|
|
Path(dest).write_bytes(zip_bytes)
|
|
else:
|
|
Path(dest).write_text("ffffffff some-other-file.zip\n")
|
|
|
|
monkeypatch.setattr(bw, "_http_download", fake_download)
|
|
|
|
with pytest.raises(RuntimeError, match="No checksum entry"):
|
|
bw.install_bws()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# fetch_bitwarden_secrets
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _fake_bws_payload(items):
|
|
return json.dumps(items)
|
|
|
|
|
|
def test_fetch_happy_path(monkeypatch, tmp_path):
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([
|
|
{"key": "OPENAI_API_KEY", "value": "sk-abc"},
|
|
{"key": "ANTHROPIC_API_KEY", "value": "sk-ant-xyz"},
|
|
])
|
|
|
|
def fake_run(cmd, **kwargs):
|
|
assert cmd[0] == str(fake_binary)
|
|
assert "secret" in cmd and "list" in cmd
|
|
assert kwargs["env"]["BWS_ACCESS_TOKEN"] == "0.fake.token"
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
|
|
secrets, warnings = bw.fetch_bitwarden_secrets(
|
|
access_token="0.fake.token",
|
|
project_id="proj-uuid",
|
|
binary=fake_binary,
|
|
use_cache=False,
|
|
)
|
|
assert secrets == {
|
|
"OPENAI_API_KEY": "sk-abc",
|
|
"ANTHROPIC_API_KEY": "sk-ant-xyz",
|
|
}
|
|
assert warnings == []
|
|
|
|
|
|
def test_fetch_skips_invalid_env_names(monkeypatch, tmp_path):
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([
|
|
{"key": "VALID_KEY", "value": "v1"},
|
|
{"key": "1BAD_START", "value": "v2"},
|
|
{"key": "has spaces", "value": "v3"},
|
|
{"key": "DASH-KEY", "value": "v4"},
|
|
])
|
|
|
|
monkeypatch.setattr(
|
|
bw.subprocess,
|
|
"run",
|
|
lambda *a, **kw: mock.Mock(returncode=0, stdout=payload, stderr=""),
|
|
)
|
|
|
|
secrets, warnings = bw.fetch_bitwarden_secrets(
|
|
access_token="0.t",
|
|
project_id="p",
|
|
binary=fake_binary,
|
|
use_cache=False,
|
|
)
|
|
assert secrets == {"VALID_KEY": "v1"}
|
|
assert len(warnings) == 3
|
|
|
|
|
|
def test_fetch_auth_failure(monkeypatch, tmp_path):
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
|
|
monkeypatch.setattr(
|
|
bw.subprocess,
|
|
"run",
|
|
lambda *a, **kw: mock.Mock(
|
|
returncode=1, stdout="", stderr="Error: invalid access token"
|
|
),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="invalid access token"):
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.bad",
|
|
project_id="p",
|
|
binary=fake_binary,
|
|
use_cache=False,
|
|
)
|
|
|
|
|
|
def test_fetch_timeout(monkeypatch, tmp_path):
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
|
|
def fake_run(*a, **kw):
|
|
raise subprocess.TimeoutExpired(cmd="bws", timeout=30)
|
|
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
|
|
with pytest.raises(RuntimeError, match="timed out"):
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t",
|
|
project_id="p",
|
|
binary=fake_binary,
|
|
use_cache=False,
|
|
)
|
|
|
|
|
|
def test_fetch_non_json(monkeypatch, tmp_path):
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
|
|
monkeypatch.setattr(
|
|
bw.subprocess,
|
|
"run",
|
|
lambda *a, **kw: mock.Mock(
|
|
returncode=0, stdout="not json at all", stderr=""
|
|
),
|
|
)
|
|
|
|
with pytest.raises(RuntimeError, match="non-JSON"):
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t",
|
|
project_id="p",
|
|
binary=fake_binary,
|
|
use_cache=False,
|
|
)
|
|
|
|
|
|
def test_fetch_cache_hits(monkeypatch, tmp_path):
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K", "value": "v"}])
|
|
|
|
call_count = {"n": 0}
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
|
|
bw.fetch_bitwarden_secrets(access_token="0.t", project_id="p",
|
|
binary=fake_binary, cache_ttl_seconds=60)
|
|
bw.fetch_bitwarden_secrets(access_token="0.t", project_id="p",
|
|
binary=fake_binary, cache_ttl_seconds=60)
|
|
assert call_count["n"] == 1 # cached on second call
|
|
|
|
|
|
def test_fetch_server_url_sets_env(monkeypatch, tmp_path):
|
|
"""server_url must be plumbed into the subprocess as BWS_SERVER_URL."""
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K", "value": "v"}])
|
|
|
|
captured_env = {}
|
|
|
|
def fake_run(cmd, **kwargs):
|
|
captured_env.update(kwargs["env"])
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t",
|
|
project_id="p",
|
|
binary=fake_binary,
|
|
use_cache=False,
|
|
server_url="https://vault.bitwarden.eu",
|
|
)
|
|
assert captured_env.get("BWS_SERVER_URL") == "https://vault.bitwarden.eu"
|
|
|
|
|
|
def test_fetch_no_server_url_does_not_set_env(monkeypatch, tmp_path):
|
|
"""When server_url is empty, BWS_SERVER_URL must not be injected."""
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([])
|
|
# Make sure the inherited env doesn't already have BWS_SERVER_URL set.
|
|
monkeypatch.delenv("BWS_SERVER_URL", raising=False)
|
|
|
|
captured_env = {}
|
|
|
|
def fake_run(cmd, **kwargs):
|
|
captured_env.update(kwargs["env"])
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t",
|
|
project_id="p",
|
|
binary=fake_binary,
|
|
use_cache=False,
|
|
)
|
|
assert "BWS_SERVER_URL" not in captured_env
|
|
|
|
|
|
def test_fetch_server_url_keyed_in_cache(monkeypatch, tmp_path):
|
|
"""Different server_url values must produce separate cache entries."""
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K", "value": "v"}])
|
|
|
|
call_count = {"n": 0}
|
|
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
|
|
# US (default empty) — fresh fetch.
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="p",
|
|
binary=fake_binary, cache_ttl_seconds=60,
|
|
)
|
|
# EU — different server_url, must NOT hit the US cache entry.
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="p",
|
|
binary=fake_binary, cache_ttl_seconds=60,
|
|
server_url="https://vault.bitwarden.eu",
|
|
)
|
|
# Second EU call hits cache.
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="p",
|
|
binary=fake_binary, cache_ttl_seconds=60,
|
|
server_url="https://vault.bitwarden.eu",
|
|
)
|
|
assert call_count["n"] == 2
|
|
|
|
|
|
def test_fetch_cache_disabled(monkeypatch, tmp_path):
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([])
|
|
call_count = {"n": 0}
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
|
|
bw.fetch_bitwarden_secrets(access_token="0.t", project_id="p",
|
|
binary=fake_binary, use_cache=False)
|
|
bw.fetch_bitwarden_secrets(access_token="0.t", project_id="p",
|
|
binary=fake_binary, use_cache=False)
|
|
assert call_count["n"] == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# apply_bitwarden_secrets — the public entry point used by env_loader
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_apply_disabled_returns_empty():
|
|
result = bw.apply_bitwarden_secrets(enabled=False, project_id="p")
|
|
assert result.ok
|
|
assert not result.applied
|
|
assert not result.error
|
|
|
|
|
|
def test_apply_missing_token(monkeypatch):
|
|
monkeypatch.delenv("BWS_ACCESS_TOKEN", raising=False)
|
|
result = bw.apply_bitwarden_secrets(
|
|
enabled=True, project_id="p", auto_install=False
|
|
)
|
|
assert not result.ok
|
|
assert "BWS_ACCESS_TOKEN" in result.error
|
|
|
|
|
|
def test_apply_missing_project_id(monkeypatch):
|
|
monkeypatch.setenv("BWS_ACCESS_TOKEN", "0.t")
|
|
result = bw.apply_bitwarden_secrets(
|
|
enabled=True, project_id="", auto_install=False
|
|
)
|
|
assert not result.ok
|
|
assert "project_id" in result.error
|
|
|
|
|
|
def test_apply_does_not_override_existing(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("BWS_ACCESS_TOKEN", "0.t")
|
|
monkeypatch.setenv("OPENAI_API_KEY", "existing-value")
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([
|
|
{"key": "OPENAI_API_KEY", "value": "bsm-value"},
|
|
{"key": "NEW_KEY", "value": "new-value"},
|
|
])
|
|
monkeypatch.setattr(
|
|
bw.subprocess, "run",
|
|
lambda *a, **kw: mock.Mock(returncode=0, stdout=payload, stderr=""),
|
|
)
|
|
monkeypatch.setattr(bw, "find_bws", lambda **kw: fake_binary)
|
|
|
|
result = bw.apply_bitwarden_secrets(
|
|
enabled=True, project_id="p",
|
|
override_existing=False, auto_install=False,
|
|
)
|
|
assert result.ok
|
|
assert "NEW_KEY" in result.applied
|
|
assert "OPENAI_API_KEY" in result.skipped
|
|
assert os.environ["OPENAI_API_KEY"] == "existing-value"
|
|
assert os.environ["NEW_KEY"] == "new-value"
|
|
|
|
|
|
def test_apply_override_existing(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("BWS_ACCESS_TOKEN", "0.t")
|
|
monkeypatch.setenv("OPENAI_API_KEY", "stale")
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "OPENAI_API_KEY", "value": "fresh"}])
|
|
monkeypatch.setattr(
|
|
bw.subprocess, "run",
|
|
lambda *a, **kw: mock.Mock(returncode=0, stdout=payload, stderr=""),
|
|
)
|
|
monkeypatch.setattr(bw, "find_bws", lambda **kw: fake_binary)
|
|
|
|
result = bw.apply_bitwarden_secrets(
|
|
enabled=True, project_id="p",
|
|
override_existing=True, auto_install=False,
|
|
)
|
|
assert result.ok
|
|
assert os.environ["OPENAI_API_KEY"] == "fresh"
|
|
|
|
|
|
def test_apply_never_overrides_bootstrap_token(monkeypatch, tmp_path):
|
|
"""Even with override_existing=True, the access-token var is preserved."""
|
|
monkeypatch.setenv("BWS_ACCESS_TOKEN", "0.original")
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([
|
|
{"key": "BWS_ACCESS_TOKEN", "value": "0.malicious-replacement"},
|
|
])
|
|
monkeypatch.setattr(
|
|
bw.subprocess, "run",
|
|
lambda *a, **kw: mock.Mock(returncode=0, stdout=payload, stderr=""),
|
|
)
|
|
monkeypatch.setattr(bw, "find_bws", lambda **kw: fake_binary)
|
|
|
|
result = bw.apply_bitwarden_secrets(
|
|
enabled=True, project_id="p",
|
|
override_existing=True, auto_install=False,
|
|
)
|
|
assert os.environ["BWS_ACCESS_TOKEN"] == "0.original"
|
|
assert "BWS_ACCESS_TOKEN" in result.skipped
|
|
|
|
|
|
def test_apply_swallows_fetch_errors(monkeypatch, tmp_path):
|
|
"""A fetch failure produces an error, NOT an exception."""
|
|
monkeypatch.setenv("BWS_ACCESS_TOKEN", "0.t")
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
monkeypatch.setattr(
|
|
bw.subprocess, "run",
|
|
lambda *a, **kw: mock.Mock(returncode=1, stdout="", stderr="bad token"),
|
|
)
|
|
monkeypatch.setattr(bw, "find_bws", lambda **kw: fake_binary)
|
|
|
|
result = bw.apply_bitwarden_secrets(
|
|
enabled=True, project_id="p", auto_install=False,
|
|
)
|
|
assert not result.ok
|
|
assert "bad token" in result.error
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# env_loader integration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_env_loader_skips_when_disabled(tmp_path, monkeypatch):
|
|
"""No config.yaml present → no BSM call, no crash."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
|
|
|
from hermes_cli.env_loader import _apply_external_secret_sources
|
|
# Should be a no-op (returns None).
|
|
assert _apply_external_secret_sources(home) is None
|
|
|
|
|
|
def test_env_loader_calls_bsm_when_enabled(tmp_path, monkeypatch):
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
(home / "config.yaml").write_text(
|
|
"secrets:\n"
|
|
" bitwarden:\n"
|
|
" enabled: true\n"
|
|
" project_id: 'proj-1'\n"
|
|
" access_token_env: 'BWS_ACCESS_TOKEN'\n"
|
|
" cache_ttl_seconds: 0\n"
|
|
" override_existing: false\n"
|
|
" auto_install: false\n"
|
|
)
|
|
monkeypatch.setenv("HERMES_HOME", str(home))
|
|
monkeypatch.setenv("BWS_ACCESS_TOKEN", "0.t")
|
|
monkeypatch.delenv("MY_BSM_KEY", raising=False)
|
|
|
|
called = {"n": 0}
|
|
def fake_apply(**kwargs):
|
|
called["n"] += 1
|
|
assert kwargs["enabled"] is True
|
|
assert kwargs["project_id"] == "proj-1"
|
|
os.environ["MY_BSM_KEY"] = "from-bsm"
|
|
return bw.FetchResult(
|
|
secrets={"MY_BSM_KEY": "from-bsm"},
|
|
applied=["MY_BSM_KEY"],
|
|
)
|
|
|
|
monkeypatch.setattr(
|
|
"agent.secret_sources.bitwarden.apply_bitwarden_secrets",
|
|
fake_apply,
|
|
)
|
|
|
|
from hermes_cli.env_loader import _apply_external_secret_sources
|
|
_apply_external_secret_sources(home)
|
|
|
|
assert called["n"] == 1
|
|
assert os.environ.get("MY_BSM_KEY") == "from-bsm"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Disk-persisted cache (cross-process — speeds up back-to-back CLI invocations)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_disk_cache_written_after_first_fetch(monkeypatch, tmp_path):
|
|
"""First fetch hits bws AND writes a 0600 file under hermes_home/cache/."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K1", "value": "v1"}])
|
|
|
|
call_count = {"n": 0}
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
bw._reset_cache_for_tests(home)
|
|
|
|
secrets, _ = bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, home_path=home,
|
|
)
|
|
assert secrets == {"K1": "v1"}
|
|
assert call_count["n"] == 1
|
|
|
|
cache_path = bw._disk_cache_path(home)
|
|
assert cache_path.exists()
|
|
# Mode must be 0600 — disk cache contains plaintext secret values
|
|
mode = os.stat(cache_path).st_mode & 0o777
|
|
assert mode == 0o600, f"expected 0o600, got 0o{mode:o}"
|
|
|
|
# File contents: key (fingerprint not raw token), secrets dict, fetched_at
|
|
payload_disk = json.loads(cache_path.read_text())
|
|
assert set(payload_disk.keys()) == {"key", "secrets", "fetched_at"}
|
|
assert payload_disk["secrets"] == {"K1": "v1"}
|
|
# Critically, the raw access token must NOT appear anywhere in the file
|
|
assert "0.t" not in cache_path.read_text()
|
|
|
|
|
|
def test_disk_cache_short_circuits_bws_when_fresh(monkeypatch, tmp_path):
|
|
"""Second fetch (different process simulation) skips bws entirely."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K1", "value": "v1"}])
|
|
|
|
call_count = {"n": 0}
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
bw._reset_cache_for_tests(home)
|
|
|
|
# First call: hits bws, populates disk cache
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, home_path=home,
|
|
)
|
|
assert call_count["n"] == 1
|
|
|
|
# Clear ONLY the in-process cache to simulate a fresh subprocess.
|
|
bw._CACHE.clear()
|
|
|
|
secrets2, _ = bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, home_path=home,
|
|
)
|
|
assert secrets2 == {"K1": "v1"}
|
|
# Critical: bws was NOT invoked the second time
|
|
assert call_count["n"] == 1
|
|
|
|
|
|
def test_disk_cache_expires_with_ttl(monkeypatch, tmp_path):
|
|
"""Stale disk cache (older than ttl) triggers a refetch."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K1", "value": "v1"}])
|
|
|
|
call_count = {"n": 0}
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
bw._reset_cache_for_tests(home)
|
|
|
|
# First call
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, home_path=home,
|
|
)
|
|
assert call_count["n"] == 1
|
|
|
|
# Backdate the disk cache so the TTL window has passed
|
|
cache_path = bw._disk_cache_path(home)
|
|
payload_disk = json.loads(cache_path.read_text())
|
|
payload_disk["fetched_at"] = time.time() - 10_000
|
|
cache_path.write_text(json.dumps(payload_disk))
|
|
bw._CACHE.clear()
|
|
|
|
# Second call: stale disk → refetch
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, home_path=home,
|
|
)
|
|
assert call_count["n"] == 2
|
|
|
|
|
|
def test_disk_cache_key_mismatch_triggers_refetch(monkeypatch, tmp_path):
|
|
"""Disk cache entry written by a different token/project is ignored."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K1", "value": "v1"}])
|
|
|
|
call_count = {"n": 0}
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
bw._reset_cache_for_tests(home)
|
|
|
|
# Write a cache entry for a DIFFERENT token/project pair
|
|
cache_path = bw._disk_cache_path(home)
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
cache_path.write_text(json.dumps({
|
|
"key": "deadbeef00000000|other-project|",
|
|
"secrets": {"OTHER": "should-not-leak"},
|
|
"fetched_at": time.time(),
|
|
}))
|
|
|
|
secrets, _ = bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, home_path=home,
|
|
)
|
|
# We must NOT have used the foreign cache entry
|
|
assert secrets == {"K1": "v1"}
|
|
assert "OTHER" not in secrets
|
|
assert call_count["n"] == 1
|
|
|
|
|
|
def test_disk_cache_use_cache_false_skips_disk(monkeypatch, tmp_path):
|
|
"""use_cache=False must skip BOTH in-process and disk caches."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K1", "value": "v1"}])
|
|
|
|
call_count = {"n": 0}
|
|
def fake_run(*a, **kw):
|
|
call_count["n"] += 1
|
|
return mock.Mock(returncode=0, stdout=payload, stderr="")
|
|
monkeypatch.setattr(bw.subprocess, "run", fake_run)
|
|
bw._reset_cache_for_tests(home)
|
|
|
|
# First call WITH cache populates disk
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, use_cache=True, home_path=home,
|
|
)
|
|
assert call_count["n"] == 1
|
|
bw._CACHE.clear()
|
|
|
|
# Second call with use_cache=False MUST hit bws again even though disk is fresh
|
|
bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, use_cache=False, home_path=home,
|
|
)
|
|
assert call_count["n"] == 2
|
|
|
|
|
|
def test_disk_cache_corrupt_file_falls_through(monkeypatch, tmp_path):
|
|
"""A garbage cache file must NOT crash startup — we refetch."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
fake_binary = tmp_path / "bws"
|
|
fake_binary.write_text("")
|
|
payload = _fake_bws_payload([{"key": "K1", "value": "v1"}])
|
|
|
|
monkeypatch.setattr(
|
|
bw.subprocess, "run",
|
|
lambda *a, **kw: mock.Mock(returncode=0, stdout=payload, stderr=""),
|
|
)
|
|
bw._reset_cache_for_tests(home)
|
|
|
|
# Write a corrupt cache file
|
|
cache_path = bw._disk_cache_path(home)
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
cache_path.write_text("not json {{{")
|
|
|
|
secrets, _ = bw.fetch_bitwarden_secrets(
|
|
access_token="0.t", project_id="proj-1", binary=fake_binary,
|
|
cache_ttl_seconds=300, home_path=home,
|
|
)
|
|
# Refetched cleanly
|
|
assert secrets == {"K1": "v1"}
|
|
# And the corrupt file was replaced with a valid one
|
|
assert json.loads(cache_path.read_text())["secrets"] == {"K1": "v1"}
|
|
|
|
|
|
def test_reset_cache_for_tests_deletes_disk_file(tmp_path):
|
|
"""_reset_cache_for_tests(home_path) must also clean disk."""
|
|
home = tmp_path / ".hermes"
|
|
home.mkdir()
|
|
cache_path = bw._disk_cache_path(home)
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
cache_path.write_text("{}")
|
|
assert cache_path.exists()
|
|
|
|
bw._reset_cache_for_tests(home)
|
|
assert not cache_path.exists()
|
|
# Idempotent
|
|
bw._reset_cache_for_tests(home)
|