mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(cli): show masked feedback for secret prompts
This commit is contained in:
parent
d952b377aa
commit
ec4d6f1823
20 changed files with 310 additions and 61 deletions
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
from getpass import getpass
|
||||
import math
|
||||
import sys
|
||||
import time
|
||||
|
|
@ -30,6 +29,7 @@ from agent.credential_pool import (
|
|||
import hermes_cli.auth as auth_mod
|
||||
from hermes_cli.auth import PROVIDER_REGISTRY
|
||||
from hermes_constants import OPENROUTER_BASE_URL
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
|
||||
# Providers that support OAuth login in addition to API keys.
|
||||
|
|
@ -196,7 +196,7 @@ def auth_add_command(args) -> None:
|
|||
if requested_type == AUTH_TYPE_API_KEY:
|
||||
token = (getattr(args, "api_key", None) or "").strip()
|
||||
if not token:
|
||||
token = getpass("Paste your API key: ").strip()
|
||||
token = masked_secret_prompt("Paste your API key: ").strip()
|
||||
if not token:
|
||||
raise SystemExit("No API key provided.")
|
||||
default_label = _api_key_default_label(len(pool.entries()) + 1)
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ with the TUI.
|
|||
|
||||
import queue
|
||||
import time as _time
|
||||
import getpass
|
||||
|
||||
from hermes_cli.banner import cprint, _DIM, _RST
|
||||
from hermes_cli.config import save_env_value_secure
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
from hermes_constants import display_hermes_home
|
||||
|
||||
|
||||
|
|
@ -75,7 +75,7 @@ def prompt_for_secret(cli, var_name: str, prompt: str, metadata=None) -> dict:
|
|||
if not hasattr(cli, "_secret_deadline"):
|
||||
cli._secret_deadline = 0
|
||||
try:
|
||||
value = getpass.getpass(f"{prompt} (hidden, ESC or empty Enter to skip): ")
|
||||
value = masked_secret_prompt(f"{prompt} (hidden, ESC or empty Enter to skip): ")
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
value = ""
|
||||
|
||||
|
|
|
|||
|
|
@ -5,9 +5,8 @@ functions previously duplicated across setup.py, tools_config.py,
|
|||
mcp_config.py, and memory_setup.py.
|
||||
"""
|
||||
|
||||
import getpass
|
||||
|
||||
from hermes_cli.colors import Colors, color
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
|
||||
# ─── Print Helpers ────────────────────────────────────────────────────────────
|
||||
|
|
@ -59,7 +58,7 @@ def prompt(
|
|||
|
||||
try:
|
||||
if password:
|
||||
value = getpass.getpass(display)
|
||||
value = masked_secret_prompt(display)
|
||||
else:
|
||||
value = input(display)
|
||||
value = value.strip()
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ from dataclasses import dataclass
|
|||
from pathlib import Path
|
||||
from typing import Dict, Any, Optional, List, Tuple
|
||||
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Track which (config_path, mtime_ns, size) tuples we've already warned about
|
||||
|
|
@ -4004,8 +4006,7 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
|
|||
print(f" Get your key at: {var['url']}")
|
||||
|
||||
if var.get("password"):
|
||||
import getpass
|
||||
value = getpass.getpass(f" {var['prompt']}: ")
|
||||
value = masked_secret_prompt(f" {var['prompt']}: ")
|
||||
else:
|
||||
value = input(f" {var['prompt']}: ").strip()
|
||||
|
||||
|
|
@ -4056,8 +4057,9 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
|
|||
else:
|
||||
print(f" {info.get('description', name)}")
|
||||
if info.get("password"):
|
||||
import getpass
|
||||
value = getpass.getpass(f" {info.get('prompt', name)} (Enter to skip): ")
|
||||
value = masked_secret_prompt(
|
||||
f" {info.get('prompt', name)} (Enter to skip): "
|
||||
)
|
||||
else:
|
||||
value = input(f" {info.get('prompt', name)} (Enter to skip): ").strip()
|
||||
if value:
|
||||
|
|
|
|||
|
|
@ -2803,7 +2803,7 @@ def _aux_flow_provider_model(
|
|||
|
||||
def _aux_flow_custom_endpoint(task: str, task_cfg: dict) -> None:
|
||||
"""Prompt for a direct OpenAI-compatible base_url + optional api_key/model."""
|
||||
import getpass
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
display_name = next((name for key, name, _ in _all_aux_tasks() if key == task), task)
|
||||
current_base_url = str(task_cfg.get("base_url") or "").strip()
|
||||
|
|
@ -2837,7 +2837,7 @@ def _aux_flow_custom_endpoint(task: str, task_cfg: dict) -> None:
|
|||
return
|
||||
model = model or current_model
|
||||
try:
|
||||
api_key = getpass.getpass(
|
||||
api_key = masked_secret_prompt(
|
||||
"API key (optional, blank = use OPENAI_API_KEY): "
|
||||
).strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
|
|
@ -3561,6 +3561,7 @@ def _model_flow_custom(config):
|
|||
"""
|
||||
from hermes_cli.auth import _save_model_choice, deactivate_provider
|
||||
from hermes_cli.config import get_env_value, load_config, save_config
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
current_url = get_env_value("OPENAI_BASE_URL") or ""
|
||||
current_key = get_env_value("OPENAI_API_KEY") or ""
|
||||
|
|
@ -3576,9 +3577,7 @@ def _model_flow_custom(config):
|
|||
base_url = input(
|
||||
f"API base URL [{current_url or 'e.g. https://api.example.com/v1'}]: "
|
||||
).strip()
|
||||
import getpass
|
||||
|
||||
api_key = getpass.getpass(
|
||||
api_key = masked_secret_prompt(
|
||||
f"API key [{current_key[:8] + '...' if current_key else 'optional'}]: "
|
||||
).strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
|
|
@ -3990,7 +3989,6 @@ def _model_flow_azure_foundry(config, current_model=""):
|
|||
save_config,
|
||||
)
|
||||
from hermes_cli import azure_detect
|
||||
import getpass
|
||||
|
||||
# ── Load current Azure Foundry configuration ─────────────────────
|
||||
model_cfg = config.get("model", {})
|
||||
|
|
@ -4153,8 +4151,10 @@ def _model_flow_azure_foundry(config, current_model=""):
|
|||
token_provider = None
|
||||
else:
|
||||
print()
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
try:
|
||||
api_key = getpass.getpass(
|
||||
api_key = masked_secret_prompt(
|
||||
f"API key [{current_api_key[:8] + '...' if current_api_key else 'required'}]: "
|
||||
).strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
|
|
@ -4725,10 +4725,10 @@ def _model_flow_copilot(config, current_model=""):
|
|||
print(f" Login failed: {exc}")
|
||||
return
|
||||
elif choice == "2":
|
||||
try:
|
||||
import getpass
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
new_key = getpass.getpass(" Token (COPILOT_GITHUB_TOKEN): ").strip()
|
||||
try:
|
||||
new_key = masked_secret_prompt(" Token (COPILOT_GITHUB_TOKEN): ").strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print()
|
||||
return
|
||||
|
|
@ -4980,10 +4980,9 @@ def _prompt_api_key(pconfig, existing_key: str, provider_id: str = "") -> tuple:
|
|||
``return`` immediately — the user cancelled entry, declined to replace, or
|
||||
cleared the key and is now unconfigured.
|
||||
"""
|
||||
import getpass
|
||||
|
||||
from hermes_cli.auth import LMSTUDIO_NOAUTH_PLACEHOLDER
|
||||
from hermes_cli.config import save_env_value
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
key_env = pconfig.api_key_env_vars[0] if pconfig.api_key_env_vars else ""
|
||||
|
||||
|
|
@ -4993,7 +4992,7 @@ def _prompt_api_key(pconfig, existing_key: str, provider_id: str = "") -> tuple:
|
|||
else:
|
||||
prompt = f"{key_env} (or Enter to cancel): "
|
||||
try:
|
||||
entered = getpass.getpass(prompt).strip()
|
||||
entered = masked_secret_prompt(prompt).strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print()
|
||||
return ""
|
||||
|
|
@ -5308,10 +5307,10 @@ def _model_flow_bedrock_api_key(config, region, current_model=""):
|
|||
else:
|
||||
print(f" Endpoint: {mantle_base_url}")
|
||||
print()
|
||||
try:
|
||||
import getpass
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
api_key = getpass.getpass(" Bedrock API Key: ").strip()
|
||||
try:
|
||||
api_key = masked_secret_prompt(" Bedrock API Key: ").strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print()
|
||||
return
|
||||
|
|
@ -5883,10 +5882,10 @@ def _run_anthropic_oauth_flow(save_env_value):
|
|||
print()
|
||||
print(" If the setup-token was displayed above, paste it here:")
|
||||
print()
|
||||
try:
|
||||
import getpass
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
manual_token = getpass.getpass(
|
||||
try:
|
||||
manual_token = masked_secret_prompt(
|
||||
" Paste setup-token (or Enter to cancel): "
|
||||
).strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
|
|
@ -5914,10 +5913,10 @@ def _run_anthropic_oauth_flow(save_env_value):
|
|||
print()
|
||||
print(" Or paste an existing setup-token now (sk-ant-oat-...):")
|
||||
print()
|
||||
try:
|
||||
import getpass
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
token = getpass.getpass(" Setup-token (or Enter to cancel): ").strip()
|
||||
try:
|
||||
token = masked_secret_prompt(" Setup-token (or Enter to cancel): ").strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print()
|
||||
return False
|
||||
|
|
@ -6032,10 +6031,10 @@ def _model_flow_anthropic(config, current_model=""):
|
|||
print()
|
||||
print(" Get an API key at: https://platform.claude.com/settings/keys")
|
||||
print()
|
||||
try:
|
||||
import getpass
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
api_key = getpass.getpass(" API key (sk-ant-...): ").strip()
|
||||
try:
|
||||
api_key = masked_secret_prompt(" API key (sk-ant-...): ").strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print()
|
||||
return
|
||||
|
|
|
|||
|
|
@ -7,13 +7,13 @@ the provider's config schema. Writes config to config.yaml + .env.
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import getpass
|
||||
import os
|
||||
import sys
|
||||
import shlex
|
||||
from pathlib import Path
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -39,12 +39,7 @@ def _prompt(label: str, default: str | None = None, secret: bool = False) -> str
|
|||
"""Prompt for a value with optional default and secret masking."""
|
||||
suffix = f" [{default}]" if default else ""
|
||||
if secret:
|
||||
sys.stdout.write(f" {label}{suffix}: ")
|
||||
sys.stdout.flush()
|
||||
if sys.stdin.isatty():
|
||||
val = getpass.getpass(prompt="")
|
||||
else:
|
||||
val = sys.stdin.readline().strip()
|
||||
val = masked_secret_prompt(f" {label}{suffix}: ")
|
||||
else:
|
||||
sys.stdout.write(f" {label}{suffix}: ")
|
||||
sys.stdout.flush()
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ from typing import Any, Optional
|
|||
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.config import cfg_get
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -287,8 +288,7 @@ def _prompt_plugin_env_vars(manifest: dict, console) -> None:
|
|||
|
||||
try:
|
||||
if secret:
|
||||
import getpass
|
||||
value = getpass.getpass(f" {name}: ").strip()
|
||||
value = masked_secret_prompt(f" {name}: ").strip()
|
||||
else:
|
||||
value = input(f" {name}: ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
|
|
|
|||
126
hermes_cli/secret_prompt.py
Normal file
126
hermes_cli/secret_prompt.py
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
"""Secret input prompts with masked typing feedback."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import getpass
|
||||
import os
|
||||
import sys
|
||||
from collections.abc import Callable
|
||||
|
||||
|
||||
_BACKSPACE_CHARS = {"\b", "\x7f"}
|
||||
_ENTER_CHARS = {"\r", "\n"}
|
||||
_EOF_CHARS = {"\x04", "\x1a"}
|
||||
|
||||
|
||||
def _collect_masked_input(
|
||||
read_char: Callable[[], str],
|
||||
write: Callable[[str], object],
|
||||
prompt: str,
|
||||
*,
|
||||
mask: str = "*",
|
||||
) -> str:
|
||||
"""Read one secret line while writing a mask character per typed char."""
|
||||
value: list[str] = []
|
||||
write(prompt)
|
||||
|
||||
while True:
|
||||
ch = read_char()
|
||||
if ch == "":
|
||||
write("\n")
|
||||
raise EOFError
|
||||
if ch in _ENTER_CHARS:
|
||||
write("\n")
|
||||
return "".join(value)
|
||||
if ch == "\x03":
|
||||
write("\n")
|
||||
raise KeyboardInterrupt
|
||||
if ch in _EOF_CHARS:
|
||||
write("\n")
|
||||
raise EOFError
|
||||
if ch in _BACKSPACE_CHARS:
|
||||
if value:
|
||||
value.pop()
|
||||
write("\b \b")
|
||||
continue
|
||||
if ch == "\x1b":
|
||||
# Ignore escape itself. Terminals commonly send escape-prefixed
|
||||
# navigation/delete sequences; they should not become secret text.
|
||||
continue
|
||||
|
||||
value.append(ch)
|
||||
if mask:
|
||||
write(mask)
|
||||
|
||||
|
||||
def masked_secret_prompt(prompt: str, *, mask: str = "*") -> str:
|
||||
"""Prompt for a secret while showing masked typing feedback.
|
||||
|
||||
Falls back to ``getpass.getpass`` when stdin/stdout are not interactive or
|
||||
when raw terminal handling is unavailable.
|
||||
"""
|
||||
stdin = sys.stdin
|
||||
stdout = sys.stdout
|
||||
|
||||
if not _stream_is_tty(stdin) or not _stream_is_tty(stdout):
|
||||
return getpass.getpass(prompt)
|
||||
|
||||
if os.name == "nt":
|
||||
try:
|
||||
return _masked_secret_prompt_windows(prompt, mask=mask)
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
raise
|
||||
except Exception:
|
||||
return getpass.getpass(prompt)
|
||||
|
||||
try:
|
||||
return _masked_secret_prompt_posix(prompt, mask=mask)
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
raise
|
||||
except Exception:
|
||||
return getpass.getpass(prompt)
|
||||
|
||||
|
||||
def _stream_is_tty(stream) -> bool:
|
||||
try:
|
||||
return bool(stream.isatty())
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _masked_secret_prompt_windows(prompt: str, *, mask: str) -> str:
|
||||
import msvcrt
|
||||
|
||||
def read_char() -> str:
|
||||
ch = msvcrt.getwch()
|
||||
if ch in {"\x00", "\xe0"}:
|
||||
msvcrt.getwch()
|
||||
return "\x1b"
|
||||
return ch
|
||||
|
||||
def write(text: str) -> None:
|
||||
sys.stdout.write(text)
|
||||
sys.stdout.flush()
|
||||
|
||||
return _collect_masked_input(read_char, write, prompt, mask=mask)
|
||||
|
||||
|
||||
def _masked_secret_prompt_posix(prompt: str, *, mask: str) -> str:
|
||||
import termios
|
||||
import tty
|
||||
|
||||
fd = sys.stdin.fileno()
|
||||
old_attrs = termios.tcgetattr(fd)
|
||||
|
||||
def read_char() -> str:
|
||||
return sys.stdin.read(1)
|
||||
|
||||
def write(text: str) -> None:
|
||||
sys.stdout.write(text)
|
||||
sys.stdout.flush()
|
||||
|
||||
try:
|
||||
tty.setraw(fd)
|
||||
return _collect_masked_input(read_char, write, prompt, mask=mask)
|
||||
finally:
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old_attrs)
|
||||
|
|
@ -11,7 +11,6 @@ Subcommands:
|
|||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import getpass
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
|
|
@ -30,6 +29,7 @@ from hermes_cli.config import (
|
|||
save_config,
|
||||
save_env_value,
|
||||
)
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -140,7 +140,7 @@ def cmd_setup(args: argparse.Namespace) -> int:
|
|||
|
||||
token = (args.access_token or "").strip()
|
||||
if not token:
|
||||
token = getpass.getpass(f" Paste access token ({token_env}): ").strip()
|
||||
token = masked_secret_prompt(f" Paste access token ({token_env}): ").strip()
|
||||
if not token:
|
||||
console.print(" [red]Empty token, aborting.[/red]")
|
||||
return 1
|
||||
|
|
|
|||
|
|
@ -161,6 +161,7 @@ from hermes_cli.cli_output import ( # noqa: E402
|
|||
print_success,
|
||||
print_warning,
|
||||
)
|
||||
from hermes_cli.secret_prompt import masked_secret_prompt # noqa: E402
|
||||
|
||||
|
||||
def is_interactive_stdin() -> bool:
|
||||
|
|
@ -202,9 +203,7 @@ def prompt(question: str, default: str = None, password: bool = False) -> str:
|
|||
|
||||
try:
|
||||
if password:
|
||||
import getpass
|
||||
|
||||
value = getpass.getpass(color(display, Colors.YELLOW))
|
||||
value = masked_secret_prompt(color(display, Colors.YELLOW))
|
||||
else:
|
||||
value = input(color(display, Colors.YELLOW))
|
||||
|
||||
|
|
|
|||
|
|
@ -534,7 +534,7 @@ def test_model_flow_custom_saves_verified_v1_base_url(monkeypatch, capsys):
|
|||
# then display name. The api_mode prompt also runs before model selection.
|
||||
answers = iter(["http://localhost:8000", "local-key", "", "", "", "", ""])
|
||||
monkeypatch.setattr("builtins.input", lambda _prompt="": next(answers))
|
||||
monkeypatch.setattr("getpass.getpass", lambda _prompt="": next(answers))
|
||||
monkeypatch.setattr("hermes_cli.secret_prompt.masked_secret_prompt", lambda _prompt="": next(answers))
|
||||
|
||||
hermes_main._model_flow_custom({})
|
||||
output = capsys.readouterr().out
|
||||
|
|
@ -592,7 +592,7 @@ def test_model_flow_custom_persists_selected_api_mode(monkeypatch):
|
|||
]
|
||||
)
|
||||
monkeypatch.setattr("builtins.input", lambda _prompt="": next(answers))
|
||||
monkeypatch.setattr("getpass.getpass", lambda _prompt="": "test-key")
|
||||
monkeypatch.setattr("hermes_cli.secret_prompt.masked_secret_prompt", lambda _prompt="": "test-key")
|
||||
|
||||
hermes_main._model_flow_custom({"model": {"provider": "custom"}})
|
||||
|
||||
|
|
|
|||
|
|
@ -83,10 +83,10 @@ def test_cancel_secret_capture_marks_setup_skipped():
|
|||
assert cli._secret_deadline == 0
|
||||
|
||||
|
||||
def test_secret_capture_uses_getpass_without_tui():
|
||||
def test_secret_capture_uses_masked_prompt_without_tui():
|
||||
cli = _make_cli_stub()
|
||||
|
||||
with patch("hermes_cli.callbacks.getpass.getpass", return_value="secret-value"), patch(
|
||||
with patch("hermes_cli.callbacks.masked_secret_prompt", return_value="secret-value"), patch(
|
||||
"hermes_cli.callbacks.save_env_value_secure"
|
||||
) as save_secret:
|
||||
save_secret.return_value = {
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ class TestStaleOAuthTokenDetection:
|
|||
|
||||
# Simulate user types "3" (Cancel) when prompted for re-auth
|
||||
monkeypatch.setattr("builtins.input", lambda _: "3")
|
||||
monkeypatch.setattr("getpass.getpass", lambda _: "")
|
||||
monkeypatch.setattr("hermes_cli.secret_prompt.masked_secret_prompt", lambda _: "")
|
||||
|
||||
from hermes_cli.main import _model_flow_anthropic
|
||||
cfg = {}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,10 @@ def test_run_anthropic_oauth_flow_manual_token_still_persists(tmp_path, monkeypa
|
|||
monkeypatch.setattr("agent.anthropic_adapter.read_claude_code_credentials", lambda: None)
|
||||
monkeypatch.setattr("agent.anthropic_adapter.is_claude_code_token_valid", lambda creds: False)
|
||||
monkeypatch.setattr("builtins.input", lambda _prompt="": "sk-ant-oat01-manual-token")
|
||||
monkeypatch.setattr("getpass.getpass", lambda _prompt="": "sk-ant-oat01-manual-token")
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.secret_prompt.masked_secret_prompt",
|
||||
lambda _prompt="": "sk-ant-oat01-manual-token",
|
||||
)
|
||||
|
||||
from hermes_cli.main import _run_anthropic_oauth_flow
|
||||
|
||||
|
|
|
|||
20
tests/hermes_cli/test_cli_output.py
Normal file
20
tests/hermes_cli/test_cli_output.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
from hermes_cli import cli_output
|
||||
|
||||
|
||||
def test_password_prompt_uses_masked_secret_prompt(monkeypatch):
|
||||
seen = {}
|
||||
|
||||
def fake_masked_secret_prompt(display):
|
||||
seen["display"] = display
|
||||
return " secret "
|
||||
|
||||
monkeypatch.setattr(cli_output, "masked_secret_prompt", fake_masked_secret_prompt)
|
||||
|
||||
assert cli_output.prompt("API key", default="old", password=True) == "secret"
|
||||
assert "API key [old]" in seen["display"]
|
||||
|
||||
|
||||
def test_empty_password_prompt_returns_default(monkeypatch):
|
||||
monkeypatch.setattr(cli_output, "masked_secret_prompt", lambda _display: "")
|
||||
|
||||
assert cli_output.prompt("API key", default="old", password=True) == "old"
|
||||
|
|
@ -486,6 +486,49 @@ class TestOptionalEnvVarsRegistry:
|
|||
assert "TAVILY_API_KEY" in all_vars
|
||||
|
||||
|
||||
class TestConfigMigrationSecretPrompts:
|
||||
def test_required_secret_env_prompt_uses_masked_prompt(self, tmp_path, monkeypatch):
|
||||
from hermes_cli import config as cfg_mod
|
||||
|
||||
saved = {}
|
||||
|
||||
monkeypatch.setattr(cfg_mod, "sanitize_env_file", lambda: 0)
|
||||
monkeypatch.setattr(cfg_mod, "check_config_version", lambda: (999, 999))
|
||||
monkeypatch.setattr(cfg_mod, "get_missing_config_fields", lambda: [])
|
||||
monkeypatch.setattr(cfg_mod, "get_missing_skill_config_vars", lambda: [])
|
||||
monkeypatch.setattr(
|
||||
cfg_mod,
|
||||
"get_missing_env_vars",
|
||||
lambda required_only=True: [
|
||||
{
|
||||
"name": "TEST_API_KEY",
|
||||
"description": "Test key",
|
||||
"prompt": "Test API key",
|
||||
"password": True,
|
||||
}
|
||||
]
|
||||
if required_only
|
||||
else [],
|
||||
)
|
||||
def fake_masked_secret_prompt(prompt):
|
||||
saved["prompt"] = prompt
|
||||
return "secret"
|
||||
|
||||
monkeypatch.setattr(cfg_mod, "masked_secret_prompt", fake_masked_secret_prompt)
|
||||
monkeypatch.setattr(
|
||||
cfg_mod,
|
||||
"save_env_value",
|
||||
lambda name, value: saved.update({name: value}),
|
||||
)
|
||||
|
||||
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
|
||||
results = cfg_mod.migrate_config(interactive=True, quiet=True)
|
||||
|
||||
assert saved["prompt"] == " Test API key: "
|
||||
assert saved["TEST_API_KEY"] == "secret"
|
||||
assert results["env_added"] == ["TEST_API_KEY"]
|
||||
|
||||
|
||||
class TestAnthropicTokenMigration:
|
||||
"""Test that config version 8→9 clears ANTHROPIC_TOKEN."""
|
||||
|
||||
|
|
|
|||
|
|
@ -663,7 +663,7 @@ class TestPromptPluginEnvVars:
|
|||
printed = " ".join(str(c) for c in console.print.call_args_list)
|
||||
assert "langfuse.com" in printed
|
||||
|
||||
def test_secret_uses_getpass(self):
|
||||
def test_secret_uses_masked_prompt(self):
|
||||
from hermes_cli.plugins_cmd import _prompt_plugin_env_vars
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
|
@ -674,11 +674,11 @@ class TestPromptPluginEnvVars:
|
|||
}
|
||||
|
||||
with patch("hermes_cli.config.get_env_value", return_value=None), \
|
||||
patch("getpass.getpass", return_value="s3cret") as mock_gp, \
|
||||
patch("hermes_cli.plugins_cmd.masked_secret_prompt", return_value="s3cret") as mock_prompt, \
|
||||
patch("hermes_cli.config.save_env_value"):
|
||||
_prompt_plugin_env_vars(manifest, console)
|
||||
|
||||
mock_gp.assert_called_once()
|
||||
mock_prompt.assert_called_once()
|
||||
|
||||
def test_empty_input_skips(self):
|
||||
from hermes_cli.plugins_cmd import _prompt_plugin_env_vars
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ def _run_prompt(existing_key, choice, new_key="", provider_id="", pconfig_name="
|
|||
|
||||
pconfig = _pconfig(pconfig_name)
|
||||
with patch("builtins.input", return_value=choice), \
|
||||
patch("getpass.getpass", return_value=new_key):
|
||||
patch("hermes_cli.secret_prompt.masked_secret_prompt", return_value=new_key):
|
||||
return m._prompt_api_key(pconfig, existing_key, provider_id=provider_id)
|
||||
|
||||
|
||||
|
|
|
|||
62
tests/hermes_cli/test_secret_prompt.py
Normal file
62
tests/hermes_cli/test_secret_prompt.py
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
import pytest
|
||||
|
||||
from hermes_cli.secret_prompt import _collect_masked_input, masked_secret_prompt
|
||||
|
||||
|
||||
def _run_collect(chars: str):
|
||||
output: list[str] = []
|
||||
iterator = iter(chars)
|
||||
|
||||
def read_char() -> str:
|
||||
return next(iterator, "")
|
||||
|
||||
def write(text: str) -> None:
|
||||
output.append(text)
|
||||
|
||||
value = _collect_masked_input(
|
||||
read_char,
|
||||
write,
|
||||
"API key: ",
|
||||
)
|
||||
return value, "".join(output)
|
||||
|
||||
|
||||
def test_collect_masked_input_shows_feedback_without_echoing_secret():
|
||||
value, output = _run_collect("secret\n")
|
||||
|
||||
assert value == "secret"
|
||||
assert output == "API key: ******\n"
|
||||
assert "secret" not in output
|
||||
|
||||
|
||||
def test_collect_masked_input_handles_backspace():
|
||||
value, output = _run_collect("sec\x7fret\r")
|
||||
|
||||
assert value == "seret"
|
||||
assert output == "API key: ***\b \b***\n"
|
||||
assert "secret" not in output
|
||||
|
||||
|
||||
def test_collect_masked_input_raises_keyboard_interrupt():
|
||||
output: list[str] = []
|
||||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
_collect_masked_input(
|
||||
lambda: "\x03",
|
||||
output.append,
|
||||
"API key: ",
|
||||
)
|
||||
|
||||
assert "".join(output) == "API key: \n"
|
||||
|
||||
|
||||
def test_masked_secret_prompt_falls_back_to_getpass_for_non_tty(monkeypatch):
|
||||
class NonTty:
|
||||
def isatty(self):
|
||||
return False
|
||||
|
||||
monkeypatch.setattr("sys.stdin", NonTty())
|
||||
monkeypatch.setattr("sys.stdout", NonTty())
|
||||
monkeypatch.setattr("getpass.getpass", lambda prompt: f"value from {prompt}")
|
||||
|
||||
assert masked_secret_prompt("API key: ") == "value from API key: "
|
||||
|
|
@ -14,7 +14,8 @@ def test_prompt_strips_bracketed_paste_markers(monkeypatch):
|
|||
|
||||
def test_password_prompt_strips_bracketed_paste_markers(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"getpass.getpass",
|
||||
setup_mod,
|
||||
"masked_secret_prompt",
|
||||
lambda _prompt="": "\x1b[200~secret-token\x1b[201~",
|
||||
)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue