fix(gateway): scope /yolo to the active session

This commit is contained in:
tars 2026-04-10 16:55:51 +09:00 committed by Teknium
parent 68528068ec
commit 9a0dfb5a6d
4 changed files with 185 additions and 10 deletions

View file

@ -4927,14 +4927,21 @@ class GatewayRunner:
return f"🧠 ✓ Reasoning effort set to `{effort}` (this session only)"
async def _handle_yolo_command(self, event: MessageEvent) -> str:
"""Handle /yolo — toggle dangerous command approval bypass."""
current = bool(os.environ.get("HERMES_YOLO_MODE"))
"""Handle /yolo — toggle dangerous command approval bypass for this session only."""
from tools.approval import (
disable_session_yolo,
enable_session_yolo,
is_session_yolo_enabled,
)
session_key = self._session_key_for_source(event.source)
current = is_session_yolo_enabled(session_key)
if current:
os.environ.pop("HERMES_YOLO_MODE", None)
return "⚠️ YOLO mode **OFF** — dangerous commands will require approval."
disable_session_yolo(session_key)
return "⚠️ YOLO mode **OFF** for this session — dangerous commands will require approval."
else:
os.environ["HERMES_YOLO_MODE"] = "1"
return "⚡ YOLO mode **ON** — all commands auto-approved. Use with caution."
enable_session_yolo(session_key)
return "⚡ YOLO mode **ON** for this session — all commands auto-approved. Use with caution."
async def _handle_verbose_command(self, event: MessageEvent) -> str:
"""Handle /verbose command — cycle tool progress display mode.

View file

@ -0,0 +1,62 @@
"""Tests for gateway /yolo session scoping."""
import os
import pytest
import gateway.run as gateway_run
from gateway.config import Platform
from gateway.platforms.base import MessageEvent
from gateway.session import SessionSource
from tools.approval import clear_session, is_session_yolo_enabled
@pytest.fixture(autouse=True)
def _clean_yolo_state(monkeypatch):
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
clear_session("agent:main:telegram:dm:chat-a")
clear_session("agent:main:telegram:dm:chat-b")
yield
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
clear_session("agent:main:telegram:dm:chat-a")
clear_session("agent:main:telegram:dm:chat-b")
def _make_runner():
runner = object.__new__(gateway_run.GatewayRunner)
runner.session_store = None
runner.config = None
return runner
def _make_event(chat_id: str) -> MessageEvent:
source = SessionSource(
platform=Platform.TELEGRAM,
user_id=f"user-{chat_id}",
chat_id=chat_id,
user_name="tester",
chat_type="dm",
)
return MessageEvent(text="/yolo", source=source)
@pytest.mark.asyncio
async def test_yolo_command_toggles_only_current_session(monkeypatch):
runner = _make_runner()
event_a = _make_event("chat-a")
session_a = runner._session_key_for_source(event_a.source)
session_b = runner._session_key_for_source(_make_event("chat-b").source)
result_on = await runner._handle_yolo_command(event_a)
assert "ON" in result_on
assert is_session_yolo_enabled(session_a) is True
assert is_session_yolo_enabled(session_b) is False
assert os.environ.get("HERMES_YOLO_MODE") is None
result_off = await runner._handle_yolo_command(event_a)
assert "OFF" in result_off
assert is_session_yolo_enabled(session_a) is False
assert os.environ.get("HERMES_YOLO_MODE") is None

View file

@ -10,6 +10,11 @@ from tools.approval import (
check_all_command_guards,
check_dangerous_command,
detect_dangerous_command,
disable_session_yolo,
enable_session_yolo,
is_session_yolo_enabled,
reset_current_session_key,
set_current_session_key,
)
@ -18,10 +23,14 @@ def _clear_approval_state():
approval_module._permanent_approved.clear()
approval_module.clear_session("default")
approval_module.clear_session("test-session")
approval_module.clear_session("session-a")
approval_module.clear_session("session-b")
yield
approval_module._permanent_approved.clear()
approval_module.clear_session("default")
approval_module.clear_session("test-session")
approval_module.clear_session("session-a")
approval_module.clear_session("session-b")
class TestYoloMode:
@ -108,3 +117,67 @@ class TestYoloMode:
result = check_dangerous_command("rm -rf /", "local",
approval_callback=lambda *a: "deny")
assert not result["approved"]
def test_session_scoped_yolo_only_bypasses_current_session(self, monkeypatch):
"""Gateway /yolo should only bypass approvals for the active session."""
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
enable_session_yolo("session-a")
assert is_session_yolo_enabled("session-a") is True
assert is_session_yolo_enabled("session-b") is False
token_a = set_current_session_key("session-a")
try:
approved = check_dangerous_command("rm -rf /", "local")
assert approved["approved"] is True
finally:
reset_current_session_key(token_a)
token_b = set_current_session_key("session-b")
try:
blocked = check_dangerous_command(
"rm -rf /",
"local",
approval_callback=lambda *a: "deny",
)
assert blocked["approved"] is False
finally:
reset_current_session_key(token_b)
disable_session_yolo("session-a")
assert is_session_yolo_enabled("session-a") is False
def test_session_scoped_yolo_bypasses_combined_guard_only_for_current_session(self, monkeypatch):
"""Combined guard should honor session-scoped YOLO without affecting others."""
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
enable_session_yolo("session-a")
token_a = set_current_session_key("session-a")
try:
approved = check_all_command_guards("rm -rf /", "local")
assert approved["approved"] is True
finally:
reset_current_session_key(token_a)
token_b = set_current_session_key("session-b")
try:
blocked = check_all_command_guards(
"rm -rf /",
"local",
approval_callback=lambda *a: "deny",
)
assert blocked["approved"] is False
finally:
reset_current_session_key(token_b)
def test_clear_session_removes_session_yolo_state(self):
"""Session cleanup must remove YOLO bypass state."""
enable_session_yolo("session-a")
assert is_session_yolo_enabled("session-a") is True
approval_module.clear_session("session-a")
assert is_session_yolo_enabled("session-a") is False

View file

@ -172,6 +172,7 @@ def detect_dangerous_command(command: str) -> tuple:
_lock = threading.Lock()
_pending: dict[str, dict] = {}
_session_approved: dict[str, set] = {}
_session_yolo: set[str] = set()
_permanent_approved: set = set()
# =========================================================================
@ -287,6 +288,35 @@ def approve_session(session_key: str, pattern_key: str):
_session_approved.setdefault(session_key, set()).add(pattern_key)
def enable_session_yolo(session_key: str) -> None:
"""Enable YOLO bypass for a single session key."""
if not session_key:
return
with _lock:
_session_yolo.add(session_key)
def disable_session_yolo(session_key: str) -> None:
"""Disable YOLO bypass for a single session key."""
if not session_key:
return
with _lock:
_session_yolo.discard(session_key)
def is_session_yolo_enabled(session_key: str) -> bool:
"""Return True when YOLO bypass is enabled for a specific session."""
if not session_key:
return False
with _lock:
return session_key in _session_yolo
def is_current_session_yolo_enabled() -> bool:
"""Return True when the active approval session has YOLO bypass enabled."""
return is_session_yolo_enabled(get_current_session_key(default=""))
def is_approved(session_key: str, pattern_key: str) -> bool:
"""Check if a pattern is approved (session-scoped or permanent).
@ -317,6 +347,7 @@ def clear_session(session_key: str):
"""Clear all approvals and pending requests for a session."""
with _lock:
_session_approved.pop(session_key, None)
_session_yolo.discard(session_key)
_pending.pop(session_key, None)
_gateway_notify_cbs.pop(session_key, None)
# Signal ALL blocked threads so they don't hang forever
@ -557,8 +588,9 @@ def check_dangerous_command(command: str, env_type: str,
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo: bypass all approval prompts
if os.getenv("HERMES_YOLO_MODE"):
# --yolo: bypass all approval prompts. Gateway /yolo is session-scoped;
# CLI --yolo remains process-scoped via the env var for local use.
if os.getenv("HERMES_YOLO_MODE") or is_current_session_yolo_enabled():
return {"approved": True, "message": None}
is_dangerous, pattern_key, description = detect_dangerous_command(command)
@ -658,9 +690,10 @@ def check_all_command_guards(command: str, env_type: str,
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo or approvals.mode=off: bypass all approval prompts
# --yolo or approvals.mode=off: bypass all approval prompts.
# Gateway /yolo is session-scoped; CLI --yolo remains process-scoped.
approval_mode = _get_approval_mode()
if os.getenv("HERMES_YOLO_MODE") or approval_mode == "off":
if os.getenv("HERMES_YOLO_MODE") or is_current_session_yolo_enabled() or approval_mode == "off":
return {"approved": True, "message": None}
is_cli = os.getenv("HERMES_INTERACTIVE")