Merge remote-tracking branch 'origin/main' into sid/types-and-lints

# Conflicts:
#	gateway/platforms/base.py
#	gateway/platforms/qqbot/adapter.py
#	gateway/platforms/slack.py
#	hermes_cli/main.py
#	scripts/batch_runner.py
#	tools/skills_tool.py
#	uv.lock
This commit is contained in:
alt-glitch 2026-04-21 20:28:45 +05:30
commit a9ed7cb3b4
117 changed files with 7791 additions and 611 deletions

View file

@ -44,16 +44,59 @@ def _get_allowed() -> set[str]:
_config_passthrough: frozenset[str] | None = None
def _is_hermes_provider_credential(name: str) -> bool:
"""True if ``name`` is a Hermes-managed provider credential (API key,
token, or similar) per ``_HERMES_PROVIDER_ENV_BLOCKLIST``.
Skill-declared ``required_environment_variables`` frontmatter must
not be able to override this list that was the bypass in
GHSA-rhgp-j443-p4rf where a malicious skill registered
``ANTHROPIC_TOKEN`` / ``OPENAI_API_KEY`` as passthrough and received
the credential in the ``execute_code`` child process, defeating the
sandbox's scrubbing guarantee.
Non-Hermes API keys (TENOR_API_KEY, NOTION_TOKEN, etc.) are NOT
in the blocklist and remain legitimately registerable skills that
wrap third-party APIs still work.
"""
try:
from tools.environments.local import _HERMES_PROVIDER_ENV_BLOCKLIST
except Exception:
return False
return name in _HERMES_PROVIDER_ENV_BLOCKLIST
def register_env_passthrough(var_names: Iterable[str]) -> None:
"""Register environment variable names as allowed in sandboxed environments.
Typically called when a skill declares ``required_environment_variables``.
Variables that are Hermes-managed provider credentials (from
``_HERMES_PROVIDER_ENV_BLOCKLIST``) are rejected here to preserve
the ``execute_code`` sandbox's credential-scrubbing guarantee per
GHSA-rhgp-j443-p4rf. A skill that needs to talk to a Hermes-managed
provider should do so via the agent's main-process tools (web_search,
web_extract, etc.) where the credential remains safely in the main
process.
Non-Hermes third-party API keys (TENOR_API_KEY, NOTION_TOKEN, etc.)
pass through normally they were never in the sandbox scrub list.
"""
for name in var_names:
name = name.strip()
if name:
_get_allowed().add(name)
logger.debug("env passthrough: registered %s", name)
if not name:
continue
if _is_hermes_provider_credential(name):
logger.warning(
"env passthrough: refusing to register Hermes provider "
"credential %r (blocked by _HERMES_PROVIDER_ENV_BLOCKLIST). "
"Skills must not override the execute_code sandbox's "
"credential scrubbing; see GHSA-rhgp-j443-p4rf.",
name,
)
continue
_get_allowed().add(name)
logger.debug("env passthrough: registered %s", name)
def _load_config_passthrough() -> frozenset[str]:

View file

@ -213,6 +213,77 @@ def _make_run_env(env: dict) -> dict:
return run_env
def _read_terminal_shell_init_config() -> tuple[list[str], bool]:
"""Return (shell_init_files, auto_source_bashrc) from config.yaml.
Best-effort returns sensible defaults on any failure so terminal
execution never breaks because the config file is unreadable.
"""
try:
from hermes_cli.config import load_config
cfg = load_config() or {}
terminal_cfg = cfg.get("terminal") or {}
files = terminal_cfg.get("shell_init_files") or []
if not isinstance(files, list):
files = []
auto_bashrc = bool(terminal_cfg.get("auto_source_bashrc", True))
return [str(f) for f in files if f], auto_bashrc
except Exception:
return [], True
def _resolve_shell_init_files() -> list[str]:
"""Resolve the list of files to source before the login-shell snapshot.
Expands ``~`` and ``${VAR}`` references and drops anything that doesn't
exist on disk, so a missing ``~/.bashrc`` never breaks the snapshot.
The ``auto_source_bashrc`` path runs only when the user hasn't supplied
an explicit list once they have, Hermes trusts them.
"""
explicit, auto_bashrc = _read_terminal_shell_init_config()
candidates: list[str] = []
if explicit:
candidates.extend(explicit)
elif auto_bashrc and not _IS_WINDOWS:
# Bash's login-shell invocation does NOT source ~/.bashrc by default,
# so tools like nvm / asdf / pyenv that self-install there stay
# invisible to the snapshot without this nudge.
candidates.append("~/.bashrc")
resolved: list[str] = []
for raw in candidates:
try:
path = os.path.expandvars(os.path.expanduser(raw))
except Exception:
continue
if path and os.path.isfile(path):
resolved.append(path)
return resolved
def _prepend_shell_init(cmd_string: str, files: list[str]) -> str:
"""Prepend ``source <file>`` lines (guarded + silent) to a bash script.
Each file is wrapped so a failing rc file doesn't abort the whole
bootstrap: ``set +e`` keeps going on errors, ``2>/dev/null`` hides
noisy prompts, and ``|| true`` neutralises the exit status.
"""
if not files:
return cmd_string
prelude_parts = ["set +e"]
for path in files:
# shlex.quote isn't available here without an import; the files list
# comes from os.path.expanduser output so it's a concrete absolute
# path. Escape single quotes defensively anyway.
safe = path.replace("'", "'\\''")
prelude_parts.append(f"[ -r '{safe}' ] && . '{safe}' 2>/dev/null || true")
prelude = "\n".join(prelude_parts) + "\n"
return prelude + cmd_string
class LocalEnvironment(BaseEnvironment):
"""Run commands directly on the host machine.
@ -255,6 +326,16 @@ class LocalEnvironment(BaseEnvironment):
timeout: int = 120,
stdin_data: str | None = None) -> subprocess.Popen:
bash = _find_bash()
# For login-shell invocations (used by init_session to build the
# environment snapshot), prepend sources for the user's bashrc /
# custom init files so tools registered outside bash_profile
# (nvm, asdf, pyenv, …) end up on PATH in the captured snapshot.
# Non-login invocations are already sourcing the snapshot and
# don't need this.
if login:
init_files = _resolve_shell_init_files()
if init_files:
cmd_string = _prepend_shell_init(cmd_string, init_files)
args = [bash, "-l", "-c", cmd_string] if login else [bash, "-c", cmd_string]
run_env = _make_run_env(self.env)

View file

@ -35,6 +35,13 @@ from pathlib import Path
from hermes_constants import get_hermes_home
from tools.binary_extensions import BINARY_EXTENSIONS
from agent.file_safety import (
build_write_denied_paths,
build_write_denied_prefixes,
get_safe_write_root as _shared_get_safe_write_root,
is_write_denied as _shared_is_write_denied,
)
# ---------------------------------------------------------------------------
# Write-path deny list — blocks writes to sensitive system/credential files
@ -42,41 +49,9 @@ from tools.binary_extensions import BINARY_EXTENSIONS
_HOME = str(Path.home())
WRITE_DENIED_PATHS = {
os.path.realpath(p) for p in [
os.path.join(_HOME, ".ssh", "authorized_keys"),
os.path.join(_HOME, ".ssh", "id_rsa"),
os.path.join(_HOME, ".ssh", "id_ed25519"),
os.path.join(_HOME, ".ssh", "config"),
str(get_hermes_home() / ".env"),
os.path.join(_HOME, ".bashrc"),
os.path.join(_HOME, ".zshrc"),
os.path.join(_HOME, ".profile"),
os.path.join(_HOME, ".bash_profile"),
os.path.join(_HOME, ".zprofile"),
os.path.join(_HOME, ".netrc"),
os.path.join(_HOME, ".pgpass"),
os.path.join(_HOME, ".npmrc"),
os.path.join(_HOME, ".pypirc"),
"/etc/sudoers",
"/etc/passwd",
"/etc/shadow",
]
}
WRITE_DENIED_PATHS = build_write_denied_paths(_HOME)
WRITE_DENIED_PREFIXES = [
os.path.realpath(p) + os.sep for p in [
os.path.join(_HOME, ".ssh"),
os.path.join(_HOME, ".aws"),
os.path.join(_HOME, ".gnupg"),
os.path.join(_HOME, ".kube"),
"/etc/sudoers.d",
"/etc/systemd",
os.path.join(_HOME, ".docker"),
os.path.join(_HOME, ".azure"),
os.path.join(_HOME, ".config", "gh"),
]
]
WRITE_DENIED_PREFIXES = build_write_denied_prefixes(_HOME)
def _get_safe_write_root() -> Optional[str]:
@ -87,33 +62,12 @@ def _get_safe_write_root() -> Optional[str]:
not on the static deny list. Opt-in hardening for gateway/messaging
deployments that should only touch a workspace checkout.
"""
root = os.getenv("HERMES_WRITE_SAFE_ROOT", "")
if not root:
return None
try:
return os.path.realpath(os.path.expanduser(root))
except Exception:
return None
return _shared_get_safe_write_root()
def _is_write_denied(path: str) -> bool:
"""Return True if path is on the write deny list."""
resolved = os.path.realpath(os.path.expanduser(str(path)))
# 1) Static deny list
if resolved in WRITE_DENIED_PATHS:
return True
for prefix in WRITE_DENIED_PREFIXES:
if resolved.startswith(prefix):
return True
# 2) Optional safe-root sandbox
safe_root = _get_safe_write_root()
if safe_root:
if not (resolved == safe_root or resolved.startswith(safe_root + os.sep)):
return True
return False
return _shared_is_write_denied(path)
# =============================================================================
@ -784,12 +738,14 @@ class ShellFileOperations(FileOperations):
content, old_string, new_string, replace_all
)
if error:
return PatchResult(error=error)
if match_count == 0:
return PatchResult(error=f"Could not find match for old_string in {path}")
if error or match_count == 0:
err_msg = error or f"Could not find match for old_string in {path}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(err_msg, match_count, old_string, content)
except Exception:
pass
return PatchResult(error=err_msg)
# Write back
write_result = self.write_file(path, new_content)
if write_result.error:

View file

@ -7,6 +7,9 @@ import logging
import os
import threading
from pathlib import Path
from typing import Optional
from agent.file_safety import get_read_block_error
from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@ -373,24 +376,9 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
# ── Hermes internal path guard ────────────────────────────────
# Prevent prompt injection via catalog or hub metadata files.
from hermes_constants import get_hermes_home as _get_hh
_hermes_home = _get_hh().resolve()
_blocked_dirs = [
_hermes_home / "skills" / ".hub" / "index-cache",
_hermes_home / "skills" / ".hub",
]
for _blocked in _blocked_dirs:
try:
_resolved.relative_to(_blocked)
return json.dumps({
"error": (
f"Access denied: {path} is an internal Hermes cache file "
"and cannot be read directly to prevent prompt injection. "
"Use the skills_list or skill_view tools instead."
)
})
except ValueError:
pass
block_error = get_read_block_error(path)
if block_error:
return json.dumps({"error": block_error})
# ── Dedup check ───────────────────────────────────────────────
# If we already read this exact (path, offset, limit) and the
@ -682,8 +670,11 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when old_string not found — saves iterations where the agent
# retries with stale content instead of re-reading the file.
# Suppressed when patch_replace already attached a rich "Did you mean?"
# snippet (which is strictly more useful than the generic hint).
if result_dict.get("error") and "Could not find" in str(result_dict["error"]):
result_json += "\n\n[Hint: old_string not found. Use read_file to verify the current content, or search_files to locate the text.]"
if "Did you mean one of these sections?" not in str(result_dict["error"]):
result_json += "\n\n[Hint: old_string not found. Use read_file to verify the current content, or search_files to locate the text.]"
return result_json
except Exception as e:
return tool_error(str(e))

View file

@ -619,3 +619,86 @@ def _map_normalized_positions(original: str, normalized: str,
original_matches.append((orig_start, min(orig_end, len(original))))
return original_matches
def find_closest_lines(old_string: str, content: str, context_lines: int = 2, max_results: int = 3) -> str:
"""Find lines in content most similar to old_string for "did you mean?" feedback.
Returns a formatted string showing the closest matching lines with context,
or empty string if no useful match is found.
"""
if not old_string or not content:
return ""
old_lines = old_string.splitlines()
content_lines = content.splitlines()
if not old_lines or not content_lines:
return ""
# Use first line of old_string as anchor for search
anchor = old_lines[0].strip()
if not anchor:
# Try second line if first is blank
candidates = [l.strip() for l in old_lines if l.strip()]
if not candidates:
return ""
anchor = candidates[0]
# Score each line in content by similarity to anchor
scored = []
for i, line in enumerate(content_lines):
stripped = line.strip()
if not stripped:
continue
ratio = SequenceMatcher(None, anchor, stripped).ratio()
if ratio > 0.3:
scored.append((ratio, i))
if not scored:
return ""
# Take top matches
scored.sort(key=lambda x: -x[0])
top = scored[:max_results]
parts = []
seen_ranges = set()
for _, line_idx in top:
start = max(0, line_idx - context_lines)
end = min(len(content_lines), line_idx + len(old_lines) + context_lines)
key = (start, end)
if key in seen_ranges:
continue
seen_ranges.add(key)
snippet = "\n".join(
f"{start + j + 1:4d}| {content_lines[start + j]}"
for j in range(end - start)
)
parts.append(snippet)
if not parts:
return ""
return "\n---\n".join(parts)
def format_no_match_hint(error: Optional[str], match_count: int,
old_string: str, content: str) -> str:
"""Return a '\\n\\nDid you mean...' snippet for plain no-match errors.
Gated so the hint only fires for actual "old_string not found" failures.
Ambiguous-match ("Found N matches"), escape-drift, and identical-strings
errors all have ``match_count == 0`` but a "did you mean?" snippet would
be misleading those failed for unrelated reasons.
Returns an empty string when there's nothing useful to append.
"""
if match_count != 0:
return ""
if not error or not error.startswith("Could not find"):
return ""
hint = find_closest_lines(old_string, content)
if not hint:
return ""
return "\n\nDid you mean one of these sections?\n" + hint

View file

@ -34,7 +34,11 @@ import httpx
from tools.debug_helpers import DebugSession
from tools.managed_tool_gateway import resolve_managed_tool_gateway
from tools.tool_backend_helpers import managed_nous_tools_enabled, prefers_gateway
from tools.tool_backend_helpers import (
fal_key_is_configured,
managed_nous_tools_enabled,
prefers_gateway,
)
logger = logging.getLogger(__name__)
@ -287,7 +291,7 @@ _managed_fal_client_lock = threading.Lock()
def _resolve_managed_fal_gateway():
"""Return managed fal-queue gateway config when the user prefers the gateway
or direct FAL credentials are absent."""
if os.getenv("FAL_KEY") and not prefers_gateway("image_gen"):
if fal_key_is_configured() and not prefers_gateway("image_gen"):
return None
return resolve_managed_tool_gateway("fal-queue")
@ -630,7 +634,7 @@ def image_generate_tool(
if not prompt or not isinstance(prompt, str) or len(prompt.strip()) == 0:
raise ValueError("Prompt is required and must be a non-empty string")
if not (os.getenv("FAL_KEY") or _resolve_managed_fal_gateway()):
if not (fal_key_is_configured() or _resolve_managed_fal_gateway()):
message = "FAL_KEY environment variable not set"
if managed_nous_tools_enabled():
message += " and managed FAL gateway is unavailable"
@ -741,7 +745,7 @@ def image_generate_tool(
def check_fal_api_key() -> bool:
"""True if the FAL.ai API key (direct or managed gateway) is available."""
return bool(os.getenv("FAL_KEY") or _resolve_managed_fal_gateway())
return bool(fal_key_is_configured() or _resolve_managed_fal_gateway())
def check_image_generation_requirements() -> bool:

View file

@ -1249,9 +1249,47 @@ _servers: Dict[str, MCPServerTask] = {}
# _CIRCUIT_BREAKER_THRESHOLD consecutive failures, the handler returns
# a "server unreachable" message that tells the model to stop retrying,
# preventing the 90-iteration burn loop described in #10447.
# Reset to 0 on any successful call.
#
# State machine:
# closed — error count below threshold; all calls go through.
# open — threshold reached; calls short-circuit until the
# cooldown elapses.
# half-open — cooldown elapsed; the next call is a probe that
# actually hits the session. Probe success → closed.
# Probe failure → reopens (cooldown re-armed).
#
# ``_server_breaker_opened_at`` records the monotonic timestamp when
# the breaker most recently transitioned into the open state. Use the
# ``_bump_server_error`` / ``_reset_server_error`` helpers to mutate
# this state — they keep the count and timestamp in sync.
_server_error_counts: Dict[str, int] = {}
_server_breaker_opened_at: Dict[str, float] = {}
_CIRCUIT_BREAKER_THRESHOLD = 3
_CIRCUIT_BREAKER_COOLDOWN_SEC = 60.0
def _bump_server_error(server_name: str) -> None:
"""Increment the consecutive-failure count for ``server_name``.
When the count crosses :data:`_CIRCUIT_BREAKER_THRESHOLD`, stamp the
breaker-open timestamp so the cooldown clock starts (or re-starts,
for probe failures in the half-open state).
"""
n = _server_error_counts.get(server_name, 0) + 1
_server_error_counts[server_name] = n
if n >= _CIRCUIT_BREAKER_THRESHOLD:
_server_breaker_opened_at[server_name] = time.monotonic()
def _reset_server_error(server_name: str) -> None:
"""Fully close the breaker for ``server_name``.
Clears both the failure count and the breaker-open timestamp. Call
this on any unambiguous success signal (successful tool call,
successful reconnect, manual /mcp refresh).
"""
_server_error_counts[server_name] = 0
_server_breaker_opened_at.pop(server_name, None)
# ---------------------------------------------------------------------------
# Auth-failure detection helpers (Task 6 of MCP OAuth consolidation)
@ -1391,15 +1429,25 @@ def _handle_auth_error_and_retry(
break
time.sleep(0.25)
# A successful OAuth recovery is independent evidence that the
# server is viable again, so close the circuit breaker here —
# not only on retry success. Without this, a reconnect
# followed by a failing retry would leave the breaker pinned
# above threshold forever (the retry-exception branch below
# bumps the count again). The post-reset retry still goes
# through _bump_server_error on failure, so a genuinely broken
# server will re-trip the breaker as normal.
_reset_server_error(server_name)
try:
result = retry_call()
try:
parsed = json.loads(result)
if "error" not in parsed:
_server_error_counts[server_name] = 0
_reset_server_error(server_name)
return result
except (json.JSONDecodeError, TypeError):
_server_error_counts[server_name] = 0
_reset_server_error(server_name)
return result
except Exception as retry_exc:
logger.warning(
@ -1410,7 +1458,7 @@ def _handle_auth_error_and_retry(
# No recovery available, or retry also failed: surface a structured
# needs_reauth error. Bumps the circuit breaker so the model stops
# retrying the tool.
_server_error_counts[server_name] = _server_error_counts.get(server_name, 0) + 1
_bump_server_error(server_name)
return json.dumps({
"error": (
f"MCP server '{server_name}' requires re-authentication. "
@ -1612,20 +1660,33 @@ def _make_tool_handler(server_name: str, tool_name: str, tool_timeout: float):
# Circuit breaker: if this server has failed too many times
# consecutively, short-circuit with a clear message so the model
# stops retrying and uses alternative approaches (#10447).
#
# Once the cooldown elapses, the breaker transitions to
# half-open: we let the *next* call through as a probe. On
# success the success-path below resets the breaker; on
# failure the error paths below bump the count again, which
# re-stamps the open-time via _bump_server_error (re-arming
# the cooldown).
if _server_error_counts.get(server_name, 0) >= _CIRCUIT_BREAKER_THRESHOLD:
return json.dumps({
"error": (
f"MCP server '{server_name}' is unreachable after "
f"{_CIRCUIT_BREAKER_THRESHOLD} consecutive failures. "
f"Do NOT retry this tool — use alternative approaches "
f"or ask the user to check the MCP server."
)
}, ensure_ascii=False)
opened_at = _server_breaker_opened_at.get(server_name, 0.0)
age = time.monotonic() - opened_at
if age < _CIRCUIT_BREAKER_COOLDOWN_SEC:
remaining = max(1, int(_CIRCUIT_BREAKER_COOLDOWN_SEC - age))
return json.dumps({
"error": (
f"MCP server '{server_name}' is unreachable after "
f"{_server_error_counts[server_name]} consecutive "
f"failures. Auto-retry available in ~{remaining}s. "
f"Do NOT retry this tool yet — use alternative "
f"approaches or ask the user to check the MCP server."
)
}, ensure_ascii=False)
# Cooldown elapsed → fall through as a half-open probe.
with _lock:
server = _servers.get(server_name)
if not server or not server.session:
_server_error_counts[server_name] = _server_error_counts.get(server_name, 0) + 1
_bump_server_error(server_name)
return json.dumps({
"error": f"MCP server '{server_name}' is not connected"
}, ensure_ascii=False)
@ -1674,11 +1735,11 @@ def _make_tool_handler(server_name: str, tool_name: str, tool_timeout: float):
try:
parsed = json.loads(result)
if "error" in parsed:
_server_error_counts[server_name] = _server_error_counts.get(server_name, 0) + 1
_bump_server_error(server_name)
else:
_server_error_counts[server_name] = 0 # success — reset
_reset_server_error(server_name) # success — reset
except (json.JSONDecodeError, TypeError):
_server_error_counts[server_name] = 0 # non-JSON = success
_reset_server_error(server_name) # non-JSON = success
return result
except InterruptedError:
return _interrupted_call_result()
@ -1693,7 +1754,7 @@ def _make_tool_handler(server_name: str, tool_name: str, tool_timeout: float):
if recovered is not None:
return recovered
_server_error_counts[server_name] = _server_error_counts.get(server_name, 0) + 1
_bump_server_error(server_name)
logger.error(
"MCP tool %s/%s call failed: %s",
server_name, tool_name, exc,

View file

@ -293,10 +293,16 @@ def _validate_operations(
)
if count == 0:
label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)"
errors.append(
msg = (
f"{op.file_path}: hunk {label} not found"
+ (f"{match_error}" if match_error else "")
)
try:
from tools.fuzzy_match import format_no_match_hint
msg += format_no_match_hint(match_error, count, search_pattern, simulated)
except Exception:
pass
errors.append(msg)
else:
# Advance simulation so subsequent hunks validate correctly.
# Reuse the result from the call above — no second fuzzy run.
@ -540,7 +546,13 @@ def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
error = None
if error:
return False, f"Could not apply hunk: {error}"
err_msg = f"Could not apply hunk: {error}"
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(error, 0, search_pattern, new_content)
except Exception:
pass
return False, err_msg
else:
# Addition-only hunk (no context or removed lines).
# Insert at the location indicated by the context hint, or at end of file.

View file

@ -449,9 +449,15 @@ def _patch_skill(
if match_error:
# Show a short preview of the file so the model can self-correct
preview = content[:500] + ("..." if len(content) > 500 else "")
err_msg = match_error
try:
from tools.fuzzy_match import format_no_match_hint
err_msg += format_no_match_hint(match_error, match_count, old_string, content)
except Exception:
pass
return {
"success": False,
"error": match_error,
"error": err_msg,
"file_preview": preview,
}

View file

@ -507,13 +507,33 @@ def _get_disabled_skill_names() -> Set[str]:
return get_disabled_skill_names()
def _get_session_platform() -> str:
"""Resolve the current platform from gateway session context.
Mirrors the platform-resolution logic in
``agent.skill_utils.get_disabled_skill_names`` so that
``_is_skill_disabled`` respects ``HERMES_SESSION_PLATFORM``.
"""
try:
from gateway.session_context import get_session_env
return get_session_env("HERMES_SESSION_PLATFORM") or ""
except Exception:
return ""
def _is_skill_disabled(name: str, platform: str = None) -> bool:
"""Check if a skill is disabled in config."""
"""Check if a skill is disabled in config.
Resolves the active platform from (in order of precedence):
1. Explicit ``platform`` argument
2. ``HERMES_PLATFORM`` environment variable
3. ``HERMES_SESSION_PLATFORM`` from gateway session context
"""
try:
from hermes_cli.config import load_config
config = load_config()
skills_cfg = config.get("skills", {})
resolved_platform = platform or os.getenv("HERMES_PLATFORM")
resolved_platform = platform or os.getenv("HERMES_PLATFORM") or _get_session_platform()
if resolved_platform:
platform_disabled = skills_cfg.get("platform_disabled", {}).get(resolved_platform)
if platform_disabled is not None:

View file

@ -114,22 +114,44 @@ _cached_sudo_password: str = ""
# Optional UI callbacks for interactive prompts. When set, these are called
# instead of the default /dev/tty or input() readers. The CLI registers these
# so prompts route through prompt_toolkit's event loop.
# _sudo_password_callback() -> str (return password or "" to skip)
# _approval_callback(command, description) -> str ("once"/"session"/"always"/"deny")
_sudo_password_callback = None
_approval_callback = None
# Callback slots used by the approval prompt and sudo password prompt
# routines. Stored in thread-local state so overlapping ACP sessions —
# each running in its own ThreadPoolExecutor thread — don't stomp on
# each other's callbacks. See GHSA-qg5c-hvr5-hjgr.
#
# CLI mode is single-threaded, so each thread (the only one) holds its
# own callback exactly like before. Gateway mode resolves approvals via
# the per-session queue in tools.approval, not through these callbacks,
# so it's unaffected.
import threading
_callback_tls = threading.local()
def _get_sudo_password_callback():
return getattr(_callback_tls, "sudo_password", None)
def _get_approval_callback():
return getattr(_callback_tls, "approval", None)
def set_sudo_password_callback(cb):
"""Register a callback for sudo password prompts (used by CLI)."""
global _sudo_password_callback
_sudo_password_callback = cb
"""Register a callback for sudo password prompts (used by CLI).
Per-thread scope ACP sessions that run concurrently in a
ThreadPoolExecutor each have their own callback slot.
"""
_callback_tls.sudo_password = cb
def set_approval_callback(cb):
"""Register a callback for dangerous command approval prompts (used by CLI)."""
global _approval_callback
_approval_callback = cb
"""Register a callback for dangerous command approval prompts.
Per-thread scope ACP sessions that run concurrently in a
ThreadPoolExecutor each have their own callback slot. See
GHSA-qg5c-hvr5-hjgr.
"""
_callback_tls.approval = cb
# =============================================================================
# Dangerous Command Approval System
@ -144,7 +166,7 @@ from tools.approval import (
def _check_all_guards(command: str, env_type: str) -> dict:
"""Delegate to consolidated guard (tirith + dangerous cmd) with CLI callback."""
return _check_all_guards_impl(command, env_type,
approval_callback=_approval_callback)
approval_callback=_get_approval_callback())
# Allowlist: characters that can legitimately appear in directory paths.
@ -219,9 +241,10 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
import sys
# Use the registered callback when available (prompt_toolkit-compatible)
if _sudo_password_callback is not None:
_sudo_cb = _get_sudo_password_callback()
if _sudo_cb is not None:
try:
return _sudo_password_callback() or ""
return _sudo_cb() or ""
except Exception:
return ""

View file

@ -119,3 +119,24 @@ def prefers_gateway(config_section: str) -> bool:
except Exception:
pass
return False
def fal_key_is_configured() -> bool:
"""Return True when FAL_KEY is set to a non-whitespace value.
Consults both ``os.environ`` and ``~/.hermes/.env`` (via
``hermes_cli.config.get_env_value`` when available) so tool-side
checks and CLI setup-time checks agree. A whitespace-only value
is treated as unset everywhere.
"""
value = os.getenv("FAL_KEY")
if value is None:
# Fall back to the .env file for CLI paths that may run before
# dotenv is loaded into os.environ.
try:
from hermes_cli.config import get_env_value
value = get_env_value("FAL_KEY")
except Exception:
value = None
return bool(value and value.strip())

View file

@ -79,6 +79,12 @@ def _import_sounddevice():
return sd
def _import_kittentts():
"""Lazy import KittenTTS. Returns the class or raises ImportError."""
from kittentts import KittenTTS
return KittenTTS
# ===========================================================================
# Defaults
# ===========================================================================
@ -88,6 +94,8 @@ DEFAULT_ELEVENLABS_VOICE_ID = "pNInz6obpgDQGcFmaJgB" # Adam
DEFAULT_ELEVENLABS_MODEL_ID = "eleven_multilingual_v2"
DEFAULT_ELEVENLABS_STREAMING_MODEL_ID = "eleven_flash_v2_5"
DEFAULT_OPENAI_MODEL = "gpt-4o-mini-tts"
DEFAULT_KITTENTTS_MODEL = "KittenML/kitten-tts-nano-0.8-int8" # 25MB
DEFAULT_KITTENTTS_VOICE = "Jasper"
DEFAULT_OPENAI_VOICE = "alloy"
DEFAULT_OPENAI_BASE_URL = "https://api.openai.com/v1"
DEFAULT_MINIMAX_MODEL = "speech-2.8-hd"
@ -695,6 +703,15 @@ def _check_neutts_available() -> bool:
return False
def _check_kittentts_available() -> bool:
"""Check if the kittentts engine is importable (installed locally)."""
try:
import importlib.util
return importlib.util.find_spec("kittentts") is not None
except Exception:
return False
def _default_neutts_ref_audio() -> str:
"""Return path to the bundled default voice reference audio."""
return str(Path(__file__).parent / "neutts_samples" / "jo.wav")
@ -758,6 +775,69 @@ def _generate_neutts(text: str, output_path: str, tts_config: Dict[str, Any]) ->
return output_path
# ===========================================================================
# Provider: KittenTTS (local, lightweight)
# ===========================================================================
# Module-level cache for KittenTTS model instance
_kittentts_model_cache: Dict[str, Any] = {}
def _generate_kittentts(text: str, output_path: str, tts_config: Dict[str, Any]) -> str:
"""Generate speech using KittenTTS local ONNX model.
KittenTTS is a lightweight TTS engine (25-80MB models) that runs
entirely on CPU without requiring a GPU or API key.
Args:
text: Text to convert to speech.
output_path: Where to save the audio file.
tts_config: TTS config dict.
Returns:
Path to the saved audio file.
"""
KittenTTS = _import_kittentts()
kt_config = tts_config.get("kittentts", {})
model_name = kt_config.get("model", DEFAULT_KITTENTTS_MODEL)
voice = kt_config.get("voice", DEFAULT_KITTENTTS_VOICE)
speed = kt_config.get("speed", 1.0)
clean_text = kt_config.get("clean_text", True)
# Use cached model instance if available
global _kittentts_model_cache
if model_name not in _kittentts_model_cache:
logger.info("[KittenTTS] Loading model: %s", model_name)
_kittentts_model_cache[model_name] = KittenTTS(model_name)
logger.info("[KittenTTS] Model loaded successfully")
model = _kittentts_model_cache[model_name]
# Generate audio (returns numpy array at 24kHz)
audio = model.generate(text, voice=voice, speed=speed, clean_text=clean_text)
# Save as WAV
import soundfile as sf
wav_path = output_path
if not output_path.endswith(".wav"):
wav_path = output_path.rsplit(".", 1)[0] + ".wav"
sf.write(wav_path, audio, 24000)
# Convert to desired format if needed
if wav_path != output_path:
ffmpeg = shutil.which("ffmpeg")
if ffmpeg:
conv_cmd = [ffmpeg, "-i", wav_path, "-y", "-loglevel", "error", output_path]
subprocess.run(conv_cmd, check=True, timeout=30)
os.remove(wav_path)
else:
# No ffmpeg — rename the WAV to the expected path
os.rename(wav_path, output_path)
return output_path
# ===========================================================================
# Main tool function
# ===========================================================================
@ -877,6 +957,19 @@ def text_to_speech_tool(
logger.info("Generating speech with NeuTTS (local)...")
_generate_neutts(text, file_str, tts_config)
elif provider == "kittentts":
try:
_import_kittentts()
except ImportError:
return json.dumps({
"success": False,
"error": "KittenTTS provider selected but 'kittentts' package not installed. "
"Run 'hermes setup tts' and choose KittenTTS, or install manually: "
"pip install https://github.com/KittenML/KittenTTS/releases/download/0.8.1/kittentts-0.8.1-py3-none-any.whl"
}, ensure_ascii=False)
logger.info("Generating speech with KittenTTS (local, ~25MB)...")
_generate_kittentts(text, file_str, tts_config)
else:
# Default: Edge TTS (free), with NeuTTS as local fallback
edge_available = True
@ -914,9 +1007,9 @@ def text_to_speech_tool(
}, ensure_ascii=False)
# Try Opus conversion for Telegram compatibility
# Edge TTS outputs MP3, NeuTTS outputs WAV — both need ffmpeg conversion
# Edge TTS outputs MP3, NeuTTS/KittenTTS output WAV — all need ffmpeg conversion
voice_compatible = False
if provider in ("edge", "neutts", "minimax", "xai") and not file_str.endswith(".ogg"):
if provider in ("edge", "neutts", "minimax", "xai", "kittentts") and not file_str.endswith(".ogg"):
opus_path = _convert_to_opus(file_str)
if opus_path:
file_str = opus_path
@ -1001,6 +1094,8 @@ def check_tts_requirements() -> bool:
pass
if _check_neutts_available():
return True
if _check_kittentts_available():
return True
return False