mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(env-flags): widen truthy-only session env checks to sibling sites
Build on @aydnOktay's cronjob fix by routing the cronjob check through
the shared 'env_var_enabled' helper in utils.py (same truthy set:
1/true/yes/on) and applying the same semantics to the 8 sibling call
sites that read HERMES_INTERACTIVE / HERMES_GATEWAY_SESSION /
HERMES_EXEC_ASK / HERMES_CRON_SESSION with bare os.getenv() truthy
checks:
- tools/approval.py: _is_gateway_approval_context (2), check_command_safety (2),
check_all_command_guards (3) -- 7 sites total
- tools/terminal_tool.py: _handle_sudo_failure, sudo password prompt -- 2 sites
- tools/skills_tool.py: _is_gateway_surface -- 1 site
Without this, a user who exports HERMES_INTERACTIVE=0 in their shell
still gets interactive sudo prompts, approval prompts, and gateway
skill-install paths -- only the cronjob tool was hardened. Now all
consumers agree on the same false-like values.
Also drops the duplicate _is_truthy_env helper from cronjob_tools.py
in favour of the existing canonical utils.env_var_enabled.
Tests: extend the parametrized regression coverage to all three
session env vars (HERMES_INTERACTIVE / HERMES_GATEWAY_SESSION /
HERMES_EXEC_ASK) symmetrically. tests/tools/test_cronjob_tools.py:
60/60 pass; tests/tools/{approval,terminal_tool,skills_tool,
cron_approval_mode,hardline_blocklist}.py: 378/378 pass.
This commit is contained in:
parent
734aa0f367
commit
931caf2b2d
5 changed files with 40 additions and 24 deletions
|
|
@ -129,6 +129,20 @@ class TestCronjobRequirements:
|
|||
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
|
||||
assert check_cronjob_requirements() is False
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"var_name",
|
||||
["HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK"],
|
||||
)
|
||||
@pytest.mark.parametrize("false_like_value", ["0", "false", "no", "off"])
|
||||
def test_rejects_false_like_any_session_env(
|
||||
self, monkeypatch, var_name, false_like_value
|
||||
):
|
||||
"""All three session env vars share the same truthy semantics."""
|
||||
for v in ("HERMES_INTERACTIVE", "HERMES_GATEWAY_SESSION", "HERMES_EXEC_ASK"):
|
||||
monkeypatch.delenv(v, raising=False)
|
||||
monkeypatch.setenv(var_name, false_like_value)
|
||||
assert check_cronjob_requirements() is False
|
||||
|
||||
|
||||
class TestUnifiedCronjobTool:
|
||||
@pytest.fixture(autouse=True)
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import unicodedata
|
|||
from typing import Optional
|
||||
from hermes_cli.config import cfg_get
|
||||
|
||||
from utils import is_truthy_value
|
||||
from utils import env_var_enabled, is_truthy_value
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -108,9 +108,9 @@ def _is_gateway_approval_context() -> bool:
|
|||
fall through to the gateway branch would submit a pending approval
|
||||
with no listener and block the job indefinitely.
|
||||
"""
|
||||
if os.getenv("HERMES_CRON_SESSION"):
|
||||
if env_var_enabled("HERMES_CRON_SESSION"):
|
||||
return False
|
||||
if os.getenv("HERMES_GATEWAY_SESSION"):
|
||||
if env_var_enabled("HERMES_GATEWAY_SESSION"):
|
||||
return True
|
||||
return bool(_get_session_platform())
|
||||
|
||||
|
|
@ -928,12 +928,12 @@ def check_dangerous_command(command: str, env_type: str,
|
|||
if is_approved(session_key, pattern_key):
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
is_cli = os.getenv("HERMES_INTERACTIVE")
|
||||
is_cli = env_var_enabled("HERMES_INTERACTIVE")
|
||||
is_gateway = _is_gateway_approval_context()
|
||||
|
||||
if not is_cli and not is_gateway:
|
||||
# Cron sessions: respect cron_mode config
|
||||
if os.getenv("HERMES_CRON_SESSION"):
|
||||
if env_var_enabled("HERMES_CRON_SESSION"):
|
||||
if _get_cron_approval_mode() == "deny":
|
||||
return {
|
||||
"approved": False,
|
||||
|
|
@ -947,7 +947,7 @@ def check_dangerous_command(command: str, env_type: str,
|
|||
}
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
|
||||
if is_gateway or env_var_enabled("HERMES_EXEC_ASK"):
|
||||
submit_pending(session_key, {
|
||||
"command": command,
|
||||
"pattern_key": pattern_key,
|
||||
|
|
@ -1056,15 +1056,15 @@ def check_all_command_guards(command: str, env_type: str,
|
|||
if is_truthy_value(os.getenv("HERMES_YOLO_MODE")) or is_current_session_yolo_enabled() or approval_mode == "off":
|
||||
return {"approved": True, "message": None}
|
||||
|
||||
is_cli = os.getenv("HERMES_INTERACTIVE")
|
||||
is_cli = env_var_enabled("HERMES_INTERACTIVE")
|
||||
is_gateway = _is_gateway_approval_context()
|
||||
is_ask = os.getenv("HERMES_EXEC_ASK")
|
||||
is_ask = env_var_enabled("HERMES_EXEC_ASK")
|
||||
|
||||
# Preserve the existing non-interactive behavior: outside CLI/gateway/ask
|
||||
# flows, we do not block on approvals and we skip external guard work.
|
||||
if not is_cli and not is_gateway and not is_ask:
|
||||
# Cron sessions: respect cron_mode config
|
||||
if os.getenv("HERMES_CRON_SESSION"):
|
||||
if env_var_enabled("HERMES_CRON_SESSION"):
|
||||
if _get_cron_approval_mode() == "deny":
|
||||
# Run detection to get a description for the block message
|
||||
is_dangerous, _pk, description = detect_dangerous_command(command)
|
||||
|
|
|
|||
|
|
@ -662,14 +662,6 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
|
|||
}
|
||||
|
||||
|
||||
def _is_truthy_env(var_name: str) -> bool:
|
||||
"""Return True only for explicit truthy env values."""
|
||||
value = os.getenv(var_name)
|
||||
if value is None:
|
||||
return False
|
||||
return value.strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def check_cronjob_requirements() -> bool:
|
||||
"""
|
||||
Check if cronjob tools can be used.
|
||||
|
|
@ -677,11 +669,18 @@ def check_cronjob_requirements() -> bool:
|
|||
Available in interactive CLI mode and gateway/messaging platforms.
|
||||
The cron system is internal (JSON file-based scheduler ticked by the gateway),
|
||||
so no external crontab executable is required.
|
||||
|
||||
Session env vars must hold an explicit truthy string (``1``, ``true``,
|
||||
``yes``, ``on``) — false-like values (``0``, ``false``, ``no``, ``off``)
|
||||
leave the tool disabled. Uses the shared ``env_var_enabled`` helper so
|
||||
every consumer of these flags agrees on the truthy set.
|
||||
"""
|
||||
return bool(
|
||||
_is_truthy_env("HERMES_INTERACTIVE")
|
||||
or _is_truthy_env("HERMES_GATEWAY_SESSION")
|
||||
or _is_truthy_env("HERMES_EXEC_ASK")
|
||||
from utils import env_var_enabled
|
||||
|
||||
return (
|
||||
env_var_enabled("HERMES_INTERACTIVE")
|
||||
or env_var_enabled("HERMES_GATEWAY_SESSION")
|
||||
or env_var_enabled("HERMES_EXEC_ASK")
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -78,6 +78,7 @@ from typing import Dict, Any, List, Optional, Set, Tuple
|
|||
|
||||
from tools.registry import registry, tool_error
|
||||
from hermes_cli.config import cfg_get
|
||||
from utils import env_var_enabled
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -365,7 +366,7 @@ def _capture_required_environment_variables(
|
|||
|
||||
|
||||
def _is_gateway_surface() -> bool:
|
||||
if os.getenv("HERMES_GATEWAY_SESSION"):
|
||||
if env_var_enabled("HERMES_GATEWAY_SESSION"):
|
||||
return True
|
||||
from gateway.session_context import get_session_env
|
||||
return bool(get_session_env("HERMES_SESSION_PLATFORM"))
|
||||
|
|
|
|||
|
|
@ -47,6 +47,8 @@ import subprocess
|
|||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
|
||||
from utils import env_var_enabled
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
|
@ -360,7 +362,7 @@ def _handle_sudo_failure(output: str, env_type: str) -> str:
|
|||
|
||||
Returns enhanced output if sudo failed in messaging context, else original.
|
||||
"""
|
||||
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
||||
is_gateway = env_var_enabled("HERMES_GATEWAY_SESSION")
|
||||
|
||||
if not is_gateway:
|
||||
return output
|
||||
|
|
@ -868,7 +870,7 @@ def _transform_sudo_command(command: str | None) -> tuple[str | None, str | None
|
|||
if not has_configured_password and not sudo_password and _sudo_nopasswd_works():
|
||||
return command, None
|
||||
|
||||
if not has_configured_password and not sudo_password and os.getenv("HERMES_INTERACTIVE"):
|
||||
if not has_configured_password and not sudo_password and env_var_enabled("HERMES_INTERACTIVE"):
|
||||
sudo_password = _prompt_for_sudo_password(timeout_seconds=45)
|
||||
if sudo_password:
|
||||
_set_cached_sudo_password(sudo_password)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue