"""security-guidance plugin — fast pattern-matched security warnings on file writes. Wires one behaviour: * ``transform_tool_result`` hook — scans the *content being written* by ``write_file`` / ``patch`` / ``skill_manage`` (write/patch modes) for known dangerous code patterns (eval(, pickle.load, yaml.load, os.system, subprocess(shell=True), dangerouslySetInnerHTML, verify=False, ECB, XXE-prone XML parsers, GitHub Actions ``${{ github.event.* }}`` injection, torch.load without ``weights_only=True``, ...). When any pattern matches, the plugin appends a ``⚠️ Security warning`` block to the JSON tool-result string. The file is still written; the model sees the warning in the next turn's tool message and can self-correct. Why not block? Patterns have a non-trivial false-positive rate (``eval(`` in a tokenizer, ``yaml.load`` already wrapped in ``yaml.SafeLoader``, ECB inside a test fixture). Blocking would force every false positive into an approval prompt or an interrupted workflow. Warning is the right severity for layer 1 — the agent reads the warning and either fixes the code or briefly documents why the construct is safe. For block-mode (refuse the write entirely), set ``SECURITY_GUIDANCE_BLOCK=1``. This trades convenience for strictness and is intended for shared dev environments where unsafe-by-default patterns are policy violations. Pattern data lives in ``patterns.py``, forked verbatim from Anthropic's ``claude-plugins-official`` under Apache-2.0. See ``LICENSE`` and ``NOTICE`` in this directory. """ from __future__ import annotations import json import logging import os import re from typing import Any, Dict, List, Optional, Tuple from . import patterns as _patterns logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- # Tool names whose args carry "code being written to disk" we want to scan. # Maps tool name -> (path_arg_name, content_arg_names). For tools with multiple # possible content fields (patch's old/new_string vs raw patch text), we scan # every populated string field. _TARGET_TOOLS: Dict[str, Tuple[str, Tuple[str, ...]]] = { "write_file": ("path", ("content",)), "patch": ("path", ("new_string", "patch")), # skill_manage write_file / patch sub-actions land here. file_path holds # the relative path inside the skill dir; we scan it the same way. "skill_manage": ("file_path", ("file_content", "new_string")), } # Cap on how much content we scan. Above this we skip — pattern matching a # 10 MB blob has poor signal-to-noise and would slow down the agent loop. _MAX_SCAN_BYTES = 256 * 1024 def _block_mode_enabled() -> bool: return os.environ.get("SECURITY_GUIDANCE_BLOCK", "").lower() in {"1", "true", "yes", "on"} def _plugin_disabled() -> bool: return os.environ.get("SECURITY_GUIDANCE_DISABLE", "").lower() in {"1", "true", "yes", "on"} # --------------------------------------------------------------------------- # Scanning # --------------------------------------------------------------------------- # Pre-compile the regex patterns once. Substring patterns stay as plain # strings — ``str.__contains__`` is faster than a regex of literal chars. _COMPILED: List[Dict[str, Any]] = [] for _rule in _patterns.SECURITY_PATTERNS: _entry: Dict[str, Any] = { "ruleName": _rule["ruleName"], "reminder": _rule["reminder"], "path_filter": _rule.get("path_filter"), "path_check": _rule.get("path_check"), "substrings": tuple(_rule.get("substrings", ())), "regex": None, } _re_src = _rule.get("regex") if _re_src: try: _entry["regex"] = re.compile(_re_src) except re.error as _err: logger.warning( "security-guidance: skipping rule %s — invalid regex %r: %s", _rule["ruleName"], _re_src, _err, ) continue _COMPILED.append(_entry) def _scan_content(path: str, content: str) -> List[Tuple[str, str]]: """Return [(ruleName, reminder), ...] for every pattern that matches. ``path`` is used by per-rule path filters (path_filter / path_check). Each rule fires at most once per call — multiple matches of the same rule collapse into a single warning entry. """ if not content or len(content.encode("utf-8", errors="ignore")) > _MAX_SCAN_BYTES: return [] hits: List[Tuple[str, str]] = [] for entry in _COMPILED: # path_check: rule fires PURELY on path match (no content regex). Used # for blanket "you're editing a sensitive file, here are reminders" # warnings — github_actions_workflow is the canonical example. path_check = entry.get("path_check") if path_check is not None: try: if path_check(path or ""): hits.append((entry["ruleName"], entry["reminder"])) except Exception: pass # Path-check rules don't also pattern-match content; move on. continue # path_filter: rule is skipped when the path filter returns False # (e.g. Python-only rules skip .js files; eval_injection skips .md) path_filter = entry.get("path_filter") if path_filter is not None: try: if not path_filter(path or ""): continue except Exception: continue matched = False for sub in entry["substrings"]: if sub in content: matched = True break if not matched and entry["regex"] is not None: if entry["regex"].search(content): matched = True if matched: hits.append((entry["ruleName"], entry["reminder"])) return hits def _extract_path_and_content(tool_name: str, args: Any) -> List[Tuple[str, str]]: """Return [(path, content), ...] for a tool call. Empty if nothing to scan.""" spec = _TARGET_TOOLS.get(tool_name) if spec is None or not isinstance(args, dict): return [] path_key, content_keys = spec path = args.get(path_key) or "" if not isinstance(path, str): path = "" out: List[Tuple[str, str]] = [] for ck in content_keys: val = args.get(ck) if isinstance(val, str) and val: out.append((path, val)) return out def _format_warning_block(findings: List[Tuple[str, str]]) -> str: """Render findings into a Markdown block appended to the tool result.""" names = ", ".join(name for name, _ in findings) lines = [ "", "---", f"⚠️ Security guidance — {len(findings)} pattern{'s' if len(findings) != 1 else ''} matched ({names})", "", ] for _, reminder in findings: lines.append(reminder) lines.append("") lines.append( "Pattern matches can be false positives. If the construct is safe in this " "context, briefly document why in a code comment and continue. Otherwise, " "fix the code before moving on." ) return "\n".join(lines) # --------------------------------------------------------------------------- # Hooks # --------------------------------------------------------------------------- def _scan_args(tool_name: str, args: Any) -> List[Tuple[str, str]]: """Common scan path used by both pre_tool_call (block mode) and transform_tool_result (warn mode).""" if _plugin_disabled(): return [] findings: List[Tuple[str, str]] = [] for path, content in _extract_path_and_content(tool_name, args): findings.extend(_scan_content(path, content)) return findings def _on_pre_tool_call( tool_name: str = "", args: Any = None, **_: Any, ) -> Optional[Dict[str, str]]: """In block mode, refuse the write if any pattern matches. Default mode is non-blocking — we return None here and let ``transform_tool_result`` append a warning to the result instead. """ if not _block_mode_enabled(): return None findings = _scan_args(tool_name, args) if not findings: return None return { "action": "block", "message": ( "security-guidance refused this write: " + _format_warning_block(findings) + "\n\nTo override, unset SECURITY_GUIDANCE_BLOCK and retry." ), } def _on_transform_tool_result( tool_name: str = "", args: Any = None, result: Any = None, **_: Any, ) -> Optional[str]: """Warn-mode hook: append a security-warning block to the tool result. Returning a string replaces the result that the model sees in the next turn. Returning None leaves the result unchanged. """ # Block mode handles findings via pre_tool_call; nothing for this hook # to do in that case (the tool didn't run, so there's no result to wrap). if _block_mode_enabled(): return None findings = _scan_args(tool_name, args) if not findings: return None if not isinstance(result, str): return None # Don't decorate error results — the model already has bigger problems. try: parsed = json.loads(result) if isinstance(parsed, dict) and "error" in parsed and len(parsed) <= 2: return None except (ValueError, TypeError): pass return result + "\n\n" + _format_warning_block(findings) def register(ctx) -> None: ctx.register_hook("pre_tool_call", _on_pre_tool_call) ctx.register_hook("transform_tool_result", _on_transform_tool_result)