fix(security): sanitize env and redact output in quick commands + remove write-only _pending_messages

1. Quick command exec ran in the gateway process's full environment
   without env sanitization or output redaction. A quick command like
   "env" or "printenv" would leak all API keys, OAuth tokens, and
   bot credentials to the messaging user.

   Fix: apply _sanitize_subprocess_env() before exec and
   redact_sensitive_text() on output before returning.

2. GatewayRunner._pending_messages was written on every interrupt
   (lines 1331-1334) but never read or consumed anywhere. The actual
   interrupt delivery uses adapter._pending_messages (a separate dict).
   Removed the write-only accumulation to prevent unbounded growth.
This commit is contained in:
0xbyt4 2026-03-17 22:24:09 +03:00 committed by Teknium
parent 4c57a5b318
commit f6736ced81
2 changed files with 49 additions and 4 deletions

View file

@ -6216,10 +6216,9 @@ class GatewayRunner:
return None
logger.debug("PRIORITY interrupt for session %s", _quick_key)
running_agent.interrupt(event.text)
if _quick_key in self._pending_messages:
self._pending_messages[_quick_key] += "\n" + event.text
else:
self._pending_messages[_quick_key] = event.text
# NOTE: self._pending_messages was write-only (never consumed).
# The actual interrupt message is delivered via adapter._pending_messages
# which is read by _run_agent. Removed to prevent unbounded growth.
return None
# Check for commands
@ -6491,13 +6490,23 @@ class GatewayRunner:
exec_cmd = qcmd.get("command", "")
if exec_cmd:
try:
# Sanitize env to prevent credential leakage —
# quick commands run in the gateway process which
# has all API keys in os.environ.
from tools.environments.local import _sanitize_subprocess_env
sanitized_env = _sanitize_subprocess_env(os.environ.copy())
proc = await asyncio.create_subprocess_shell(
exec_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=sanitized_env,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
output = (stdout or stderr).decode().strip()
# Redact any remaining sensitive patterns in output
if output:
from agent.redact import redact_sensitive_text
output = redact_sensitive_text(output)
return output if output else "Command returned no output."
except asyncio.TimeoutError:
return "Quick command timed out (30s)."

View file

@ -1,4 +1,5 @@
"""Tests for user-defined quick commands that bypass the agent loop."""
import os
import subprocess
from unittest.mock import MagicMock, patch, AsyncMock
from rich.text import Text
@ -159,6 +160,41 @@ class TestGatewayQuickCommands:
result = await runner._handle_message(event)
assert result == "ok"
@pytest.mark.asyncio
async def test_exec_command_does_not_leak_credentials(self):
"""Quick command exec must sanitize env — API keys must not appear in output."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = {"quick_commands": {"leak": {"type": "exec", "command": "env"}}}
runner._running_agents = {}
runner._pending_messages = {}
runner._is_user_authorized = MagicMock(return_value=True)
event = self._make_event("leak")
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "sk-or-secret-12345"}):
result = await runner._handle_message(event)
assert "sk-or-secret-12345" not in result, \
"Quick command leaked OPENROUTER_API_KEY — exec runs without env sanitization"
@pytest.mark.asyncio
async def test_exec_command_output_is_redacted(self):
"""Quick command output must redact sensitive patterns before returning."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = {"quick_commands": {"token": {"type": "exec", "command": "echo sk-ant-api03-supersecretkey1234567890"}}}
runner._running_agents = {}
runner._pending_messages = {}
runner._is_user_authorized = MagicMock(return_value=True)
event = self._make_event("token")
result = await runner._handle_message(event)
assert "supersecretkey1234567890" not in result, \
"Quick command output not redacted — raw API key returned to user"
@pytest.mark.asyncio
async def test_unsupported_type_returns_error(self):
from gateway.run import GatewayRunner