From 9a0dfb5a6d4f783348bbcab63d272081e7b2ef20 Mon Sep 17 00:00:00 2001 From: tars Date: Fri, 10 Apr 2026 16:55:51 +0900 Subject: [PATCH] fix(gateway): scope /yolo to the active session --- gateway/run.py | 19 +++++--- tests/gateway/test_yolo_command.py | 62 +++++++++++++++++++++++++ tests/tools/test_yolo_mode.py | 73 ++++++++++++++++++++++++++++++ tools/approval.py | 41 +++++++++++++++-- 4 files changed, 185 insertions(+), 10 deletions(-) create mode 100644 tests/gateway/test_yolo_command.py diff --git a/gateway/run.py b/gateway/run.py index 9e9bb8fce..70bc78ecb 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4927,14 +4927,21 @@ class GatewayRunner: return f"🧠 ✓ Reasoning effort set to `{effort}` (this session only)" async def _handle_yolo_command(self, event: MessageEvent) -> str: - """Handle /yolo — toggle dangerous command approval bypass.""" - current = bool(os.environ.get("HERMES_YOLO_MODE")) + """Handle /yolo — toggle dangerous command approval bypass for this session only.""" + from tools.approval import ( + disable_session_yolo, + enable_session_yolo, + is_session_yolo_enabled, + ) + + session_key = self._session_key_for_source(event.source) + current = is_session_yolo_enabled(session_key) if current: - os.environ.pop("HERMES_YOLO_MODE", None) - return "⚠️ YOLO mode **OFF** — dangerous commands will require approval." + disable_session_yolo(session_key) + return "⚠️ YOLO mode **OFF** for this session — dangerous commands will require approval." else: - os.environ["HERMES_YOLO_MODE"] = "1" - return "⚡ YOLO mode **ON** — all commands auto-approved. Use with caution." + enable_session_yolo(session_key) + return "⚡ YOLO mode **ON** for this session — all commands auto-approved. Use with caution." async def _handle_verbose_command(self, event: MessageEvent) -> str: """Handle /verbose command — cycle tool progress display mode. diff --git a/tests/gateway/test_yolo_command.py b/tests/gateway/test_yolo_command.py new file mode 100644 index 000000000..fbdda8f1f --- /dev/null +++ b/tests/gateway/test_yolo_command.py @@ -0,0 +1,62 @@ +"""Tests for gateway /yolo session scoping.""" + +import os + +import pytest + +import gateway.run as gateway_run +from gateway.config import Platform +from gateway.platforms.base import MessageEvent +from gateway.session import SessionSource +from tools.approval import clear_session, is_session_yolo_enabled + + +@pytest.fixture(autouse=True) +def _clean_yolo_state(monkeypatch): + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + clear_session("agent:main:telegram:dm:chat-a") + clear_session("agent:main:telegram:dm:chat-b") + yield + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + clear_session("agent:main:telegram:dm:chat-a") + clear_session("agent:main:telegram:dm:chat-b") + + +def _make_runner(): + runner = object.__new__(gateway_run.GatewayRunner) + runner.session_store = None + runner.config = None + return runner + + +def _make_event(chat_id: str) -> MessageEvent: + source = SessionSource( + platform=Platform.TELEGRAM, + user_id=f"user-{chat_id}", + chat_id=chat_id, + user_name="tester", + chat_type="dm", + ) + return MessageEvent(text="/yolo", source=source) + + +@pytest.mark.asyncio +async def test_yolo_command_toggles_only_current_session(monkeypatch): + runner = _make_runner() + + event_a = _make_event("chat-a") + session_a = runner._session_key_for_source(event_a.source) + session_b = runner._session_key_for_source(_make_event("chat-b").source) + + result_on = await runner._handle_yolo_command(event_a) + + assert "ON" in result_on + assert is_session_yolo_enabled(session_a) is True + assert is_session_yolo_enabled(session_b) is False + assert os.environ.get("HERMES_YOLO_MODE") is None + + result_off = await runner._handle_yolo_command(event_a) + + assert "OFF" in result_off + assert is_session_yolo_enabled(session_a) is False + assert os.environ.get("HERMES_YOLO_MODE") is None diff --git a/tests/tools/test_yolo_mode.py b/tests/tools/test_yolo_mode.py index 7d30adcc6..3df5a078c 100644 --- a/tests/tools/test_yolo_mode.py +++ b/tests/tools/test_yolo_mode.py @@ -10,6 +10,11 @@ from tools.approval import ( check_all_command_guards, check_dangerous_command, detect_dangerous_command, + disable_session_yolo, + enable_session_yolo, + is_session_yolo_enabled, + reset_current_session_key, + set_current_session_key, ) @@ -18,10 +23,14 @@ def _clear_approval_state(): approval_module._permanent_approved.clear() approval_module.clear_session("default") approval_module.clear_session("test-session") + approval_module.clear_session("session-a") + approval_module.clear_session("session-b") yield approval_module._permanent_approved.clear() approval_module.clear_session("default") approval_module.clear_session("test-session") + approval_module.clear_session("session-a") + approval_module.clear_session("session-b") class TestYoloMode: @@ -108,3 +117,67 @@ class TestYoloMode: result = check_dangerous_command("rm -rf /", "local", approval_callback=lambda *a: "deny") assert not result["approved"] + + def test_session_scoped_yolo_only_bypasses_current_session(self, monkeypatch): + """Gateway /yolo should only bypass approvals for the active session.""" + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + + enable_session_yolo("session-a") + assert is_session_yolo_enabled("session-a") is True + assert is_session_yolo_enabled("session-b") is False + + token_a = set_current_session_key("session-a") + try: + approved = check_dangerous_command("rm -rf /", "local") + assert approved["approved"] is True + finally: + reset_current_session_key(token_a) + + token_b = set_current_session_key("session-b") + try: + blocked = check_dangerous_command( + "rm -rf /", + "local", + approval_callback=lambda *a: "deny", + ) + assert blocked["approved"] is False + finally: + reset_current_session_key(token_b) + + disable_session_yolo("session-a") + assert is_session_yolo_enabled("session-a") is False + + def test_session_scoped_yolo_bypasses_combined_guard_only_for_current_session(self, monkeypatch): + """Combined guard should honor session-scoped YOLO without affecting others.""" + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + + enable_session_yolo("session-a") + + token_a = set_current_session_key("session-a") + try: + approved = check_all_command_guards("rm -rf /", "local") + assert approved["approved"] is True + finally: + reset_current_session_key(token_a) + + token_b = set_current_session_key("session-b") + try: + blocked = check_all_command_guards( + "rm -rf /", + "local", + approval_callback=lambda *a: "deny", + ) + assert blocked["approved"] is False + finally: + reset_current_session_key(token_b) + + def test_clear_session_removes_session_yolo_state(self): + """Session cleanup must remove YOLO bypass state.""" + enable_session_yolo("session-a") + assert is_session_yolo_enabled("session-a") is True + + approval_module.clear_session("session-a") + + assert is_session_yolo_enabled("session-a") is False diff --git a/tools/approval.py b/tools/approval.py index 68a53a01c..8ebfc3d3e 100644 --- a/tools/approval.py +++ b/tools/approval.py @@ -172,6 +172,7 @@ def detect_dangerous_command(command: str) -> tuple: _lock = threading.Lock() _pending: dict[str, dict] = {} _session_approved: dict[str, set] = {} +_session_yolo: set[str] = set() _permanent_approved: set = set() # ========================================================================= @@ -287,6 +288,35 @@ def approve_session(session_key: str, pattern_key: str): _session_approved.setdefault(session_key, set()).add(pattern_key) +def enable_session_yolo(session_key: str) -> None: + """Enable YOLO bypass for a single session key.""" + if not session_key: + return + with _lock: + _session_yolo.add(session_key) + + +def disable_session_yolo(session_key: str) -> None: + """Disable YOLO bypass for a single session key.""" + if not session_key: + return + with _lock: + _session_yolo.discard(session_key) + + +def is_session_yolo_enabled(session_key: str) -> bool: + """Return True when YOLO bypass is enabled for a specific session.""" + if not session_key: + return False + with _lock: + return session_key in _session_yolo + + +def is_current_session_yolo_enabled() -> bool: + """Return True when the active approval session has YOLO bypass enabled.""" + return is_session_yolo_enabled(get_current_session_key(default="")) + + def is_approved(session_key: str, pattern_key: str) -> bool: """Check if a pattern is approved (session-scoped or permanent). @@ -317,6 +347,7 @@ def clear_session(session_key: str): """Clear all approvals and pending requests for a session.""" with _lock: _session_approved.pop(session_key, None) + _session_yolo.discard(session_key) _pending.pop(session_key, None) _gateway_notify_cbs.pop(session_key, None) # Signal ALL blocked threads so they don't hang forever @@ -557,8 +588,9 @@ def check_dangerous_command(command: str, env_type: str, if env_type in ("docker", "singularity", "modal", "daytona"): return {"approved": True, "message": None} - # --yolo: bypass all approval prompts - if os.getenv("HERMES_YOLO_MODE"): + # --yolo: bypass all approval prompts. Gateway /yolo is session-scoped; + # CLI --yolo remains process-scoped via the env var for local use. + if os.getenv("HERMES_YOLO_MODE") or is_current_session_yolo_enabled(): return {"approved": True, "message": None} is_dangerous, pattern_key, description = detect_dangerous_command(command) @@ -658,9 +690,10 @@ def check_all_command_guards(command: str, env_type: str, if env_type in ("docker", "singularity", "modal", "daytona"): return {"approved": True, "message": None} - # --yolo or approvals.mode=off: bypass all approval prompts + # --yolo or approvals.mode=off: bypass all approval prompts. + # Gateway /yolo is session-scoped; CLI --yolo remains process-scoped. approval_mode = _get_approval_mode() - if os.getenv("HERMES_YOLO_MODE") or approval_mode == "off": + if os.getenv("HERMES_YOLO_MODE") or is_current_session_yolo_enabled() or approval_mode == "off": return {"approved": True, "message": None} is_cli = os.getenv("HERMES_INTERACTIVE")