From 7255050c99ef0bd9ba54a00dbf760ed79baed75a Mon Sep 17 00:00:00 2001 From: Tranquil-Flow Date: Sat, 23 May 2026 10:06:51 +0000 Subject: [PATCH] feat(skills): add opt-in AST deep diagnostics Add opt-in AST diagnostics for skill review without making Skills Guard stricter by default. - Add hermes skills inspect --ast-deep to scan fetched skill bundles before installation - Add hermes skills audit --deep to scan already-installed hub skills - Keep AST analysis in tools/skills_ast_audit.py, separate from tools/skills_guard.py - Label output as diagnostic hints, not security verdicts - Cover dynamic import/access patterns: importlib, __import__(computed), getattr(computed), and __dict__[computed] This follows the maintainer guidance from closed PR #7436: useful AST-level analysis belongs in an opt-in diagnostic path, not in Skills Guard's default heuristic scan. --- hermes_cli/commands.py | 2 +- hermes_cli/main.py | 10 + hermes_cli/skills_hub.py | 52 +++- tests/tools/test_skills_ast_audit.py | 299 +++++++++++++++++++++++ tools/skills_ast_audit.py | 353 +++++++++++++++++++++++++++ 5 files changed, 704 insertions(+), 12 deletions(-) create mode 100644 tests/tools/test_skills_ast_audit.py create mode 100644 tools/skills_ast_audit.py diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index 815fb3caa00..f589248621c 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -164,7 +164,7 @@ COMMAND_REGISTRY: list[CommandDef] = [ cli_only=True), CommandDef("skills", "Search, install, inspect, or manage skills", "Tools & Skills", cli_only=True, - subcommands=("search", "browse", "inspect", "install")), + subcommands=("search", "browse", "inspect", "install", "audit")), CommandDef("bundles", "List skill bundles (aliases / for multiple skills)", "Tools & Skills"), CommandDef("cron", "Manage scheduled tasks", "Tools & Skills", diff --git a/hermes_cli/main.py b/hermes_cli/main.py index dbd80b5d407..85cfc16215f 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -12267,6 +12267,11 @@ Examples: "inspect", help="Preview a skill without installing" ) skills_inspect.add_argument("identifier", help="Skill identifier") + skills_inspect.add_argument( + "--ast-deep", + action="store_true", + help="Run AST-level diagnostics on Python files before installing", + ) skills_list = skills_subparsers.add_parser("list", help="List installed skills") skills_list.add_argument( @@ -12301,6 +12306,11 @@ Examples: skills_audit.add_argument( "name", nargs="?", help="Specific skill to audit (default: all)" ) + skills_audit.add_argument( + "--deep", + action="store_true", + help="Run AST-level analysis on Python files (opt-in diagnostic)", + ) skills_uninstall = skills_subparsers.add_parser( "uninstall", help="Remove a hub-installed skill" diff --git a/hermes_cli/skills_hub.py b/hermes_cli/skills_hub.py index b0540705165..9bcc0d17f79 100644 --- a/hermes_cli/skills_hub.py +++ b/hermes_cli/skills_hub.py @@ -630,8 +630,13 @@ def do_install(identifier: str, category: str = "", force: bool = False, c.print("[dim]Use /reset to start a new session now, or --now to activate immediately (invalidates prompt cache).[/]\n") -def do_inspect(identifier: str, console: Optional[Console] = None) -> None: - """Preview a skill's SKILL.md content without installing.""" +def do_inspect(identifier: str, console: Optional[Console] = None, + ast_deep: bool = False) -> None: + """Preview a skill's SKILL.md content without installing. + + When ``ast_deep=True``, also runs AST-level diagnostics against Python + files in the fetched bundle before installation. + """ from tools.skills_hub import GitHubAuth, create_source_router c = console or _console @@ -677,6 +682,11 @@ def do_inspect(identifier: str, console: Optional[Console] = None) -> None: preview += f"\n\n... ({len(lines) - 50} more lines)" c.print(Panel(preview, title="SKILL.md Preview", subtitle="hermes skills install to install")) + if bundle and ast_deep: + from tools.skills_ast_audit import ast_scan_bundle_files, format_ast_report + ast_findings = ast_scan_bundle_files(bundle.files) + c.print(format_ast_report(ast_findings, skill_name=meta.name)) + c.print() @@ -906,8 +916,13 @@ def do_update(name: Optional[str] = None, console: Optional[Console] = None) -> c.print(f"[bold green]Updated {len(updates)} skill(s).[/]\n") -def do_audit(name: Optional[str] = None, console: Optional[Console] = None) -> None: - """Re-run security scan on installed hub skills.""" +def do_audit(name: Optional[str] = None, console: Optional[Console] = None, + deep: bool = False) -> None: + """Re-run security scan on installed hub skills. + + When ``deep=True``, also runs AST-level analysis on Python files + (opt-in diagnostic, not a security gate). + """ from tools.skills_hub import HubLockFile, SKILLS_DIR from tools.skills_guard import scan_skill, format_scan_report @@ -928,6 +943,10 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None) -> N c.print(f"\n[bold]Auditing {len(targets)} skill(s)...[/]\n") + ast_scan_skill = format_ast_report = None + if deep: + from tools.skills_ast_audit import ast_scan_skill, format_ast_report + for entry in targets: skill_path = SKILLS_DIR / entry["install_path"] if not skill_path.exists(): @@ -936,6 +955,11 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None) -> N result = scan_skill(skill_path, source=entry.get("identifier", entry["source"])) c.print(format_scan_report(result)) + + if deep: + ast_findings = ast_scan_skill(skill_path) + c.print(format_ast_report(ast_findings, skill_name=entry["name"])) + c.print() @@ -1332,7 +1356,7 @@ def skills_command(args) -> None: skip_confirm=getattr(args, "yes", False), name_override=getattr(args, "name", "") or "") elif action == "inspect": - do_inspect(args.identifier) + do_inspect(args.identifier, ast_deep=getattr(args, "ast_deep", False)) elif action == "list": do_list( source_filter=args.source, @@ -1343,7 +1367,8 @@ def skills_command(args) -> None: elif action == "update": do_update(name=getattr(args, "name", None)) elif action == "audit": - do_audit(name=getattr(args, "name", None)) + do_audit(name=getattr(args, "name", None), + deep=getattr(args, "deep", False)) elif action == "uninstall": do_uninstall(args.name) elif action == "reset": @@ -1389,12 +1414,15 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None: /skills install openai/skills/skill-creator --force /skills install https://example.com/path/SKILL.md /skills inspect openai/skills/skill-creator + /skills inspect openai/skills/skill-creator --ast-deep /skills list /skills list --source hub /skills check /skills update /skills audit /skills audit my-skill + /skills audit --deep + /skills audit my-skill --deep /skills uninstall my-skill /skills tap list /skills tap add owner/repo @@ -1486,10 +1514,11 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None: name_override=name_override, console=c) elif action == "inspect": - if not args: - c.print("[bold red]Usage:[/] /skills inspect \n") + non_flag_args = [arg for arg in args if not arg.startswith("--")] + if not non_flag_args: + c.print("[bold red]Usage:[/] /skills inspect [--ast-deep]\n") return - do_inspect(args[0], console=c) + do_inspect(non_flag_args[0], console=c, ast_deep="--ast-deep" in args) elif action == "list": source_filter = "all" @@ -1509,8 +1538,9 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None: do_update(name=name, console=c) elif action == "audit": - name = args[0] if args else None - do_audit(name=name, console=c) + name = args[0] if args and not args[0].startswith("--") else None + deep = "--deep" in args + do_audit(name=name, console=c, deep=deep) elif action == "uninstall": if not args: diff --git a/tests/tools/test_skills_ast_audit.py b/tests/tools/test_skills_ast_audit.py new file mode 100644 index 00000000000..fd6d502f076 --- /dev/null +++ b/tests/tools/test_skills_ast_audit.py @@ -0,0 +1,299 @@ +""" +Tests for tools.skills_ast_audit — the opt-in AST diagnostic scanner. + +These tests verify detection of dynamic import/access patterns that can +bypass line-by-line regex scanning, without crashing on hostile or +pathological input. +""" + +import sys +from pathlib import Path + +from tools.skills_ast_audit import ( + AstFinding, + ast_scan_bundle_files, + ast_scan_file, + ast_scan_skill, + format_ast_report, +) + + +# --------------------------------------------------------------------------- +# Core detection tests +# --------------------------------------------------------------------------- + + +class TestAstScanPython: + """AST scanner detects dynamic import and access patterns.""" + + def test_importlib_import_module_detected(self, tmp_path): + """importlib.import_module() calls are flagged.""" + f = tmp_path / "evil.py" + f.write_text("import importlib\nm = importlib.import_module('os')\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_dynamic_import" in pids + assert "ast_importlib_import" in pids + + def test_importlib_submodule_import_detected(self, tmp_path): + """`import importlib.util` and similar submodules are flagged.""" + f = tmp_path / "evil.py" + f.write_text("import importlib.util\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_importlib_import" in pids + + def test_importlib_submodule_aliased_import_detected(self, tmp_path): + """`import importlib.machinery as m` (aliased submodule) is flagged.""" + f = tmp_path / "evil.py" + f.write_text("import importlib.machinery as m\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_importlib_import" in pids + + def test_from_importlib_import_detected(self, tmp_path): + """`from importlib import import_module` is flagged.""" + f = tmp_path / "evil.py" + f.write_text("from importlib import import_module\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_importlib_import" in pids + + def test_from_importlib_submodule_import_detected(self, tmp_path): + """`from importlib.util import find_spec` is flagged.""" + f = tmp_path / "evil.py" + f.write_text("from importlib.util import find_spec\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_importlib_import" in pids + + def test_importer_lookalike_not_flagged(self, tmp_path): + """`import importer` must NOT match — prefix check is dot-bounded.""" + f = tmp_path / "ok.py" + f.write_text("import importer\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_importlib_import" not in pids + + def test_from_importer_lookalike_not_flagged(self, tmp_path): + """`from importer import something` must NOT match the importlib check.""" + f = tmp_path / "ok.py" + f.write_text("from importer import something\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_importlib_import" not in pids + + def test_dunder_import_with_computed_arg_detected(self, tmp_path): + """__import__ with non-literal argument is flagged.""" + f = tmp_path / "evil.py" + f.write_text("name = 'os'\nm = __import__(name)\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_dynamic_import_computed" in pids + + def test_dunder_dict_computed_key_detected(self, tmp_path): + """__dict__[] access is flagged.""" + f = tmp_path / "evil.py" + f.write_text("key = 'environ'\nval = obj.__dict__[key]\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_dict_access" in pids + + def test_getattr_with_computed_name_detected(self, tmp_path): + """getattr(obj, computed_name) is flagged.""" + f = tmp_path / "evil.py" + f.write_text("name = 'system'\nfn = getattr(os, name)\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_dynamic_getattr" in pids + + def test_syntax_error_handled_gracefully(self, tmp_path): + """Files with syntax errors should not crash the scanner.""" + f = tmp_path / "bad.py" + f.write_text("def broken(\n") + findings = ast_scan_file(f) + assert isinstance(findings, list) + + def test_literal_dunder_import_not_flagged_by_ast(self, tmp_path): + """__import__('os') with literal string is NOT flagged by AST.""" + f = tmp_path / "ok.py" + f.write_text("m = __import__('os')\n") + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_dynamic_import_computed" not in pids + + def test_full_bypass_payload_now_detected(self, tmp_path): + """The exact bypass payload from #7072 should now be caught.""" + payload = """ +import importlib +parts = ['o', 's'] +m = importlib.import_module(''.join(parts)) +e = m.__dict__[''.join(['e','n','v','i','r','o','n'])] +""" + f = tmp_path / "exfil.py" + f.write_text(payload) + findings = ast_scan_file(f) + pids = [f.pattern_id for f in findings] + assert "ast_dynamic_import" in pids + assert "ast_dict_access" in pids + assert "ast_importlib_import" in pids + + def test_non_python_files_return_empty(self, tmp_path): + """AST scan returns empty list for non-.py files.""" + f = tmp_path / "script.sh" + f.write_text("import importlib\nimportlib.import_module('os')\n") + findings = ast_scan_file(f) + assert findings == [] + + def test_scan_handles_recursion_error_gracefully(self, tmp_path): + """Deeply-nested expressions that blow the visitor recursion limit + must not crash the scan — return whatever findings were collected so far.""" + src = "a" + ".x" * 5000 + "\n" + f = tmp_path / "deep.py" + f.write_text(src) + + original_limit = sys.getrecursionlimit() + sys.setrecursionlimit(200) + try: + findings = ast_scan_file(f) + finally: + sys.setrecursionlimit(original_limit) + + assert isinstance(findings, list) + + +# --------------------------------------------------------------------------- +# Directory scanner tests +# --------------------------------------------------------------------------- + + +class TestAstScanSkill: + """Directory-level scanning via ast_scan_skill().""" + + def test_scans_all_py_files_in_tree(self, tmp_path): + """All .py files in a skill directory are scanned recursively.""" + skill = tmp_path / "my-skill" + skill.mkdir() + sub = skill / "subpkg" + sub.mkdir() + + (skill / "main.py").write_text("import importlib\n") + (sub / "utils.py").write_text("import importlib.util\n") + + findings = ast_scan_skill(skill) + pids = [f.pattern_id for f in findings] + # Both files should have importlib findings + assert pids.count("ast_importlib_import") == 2 + + def test_skips_ignored_dirs(self, tmp_path): + """__pycache__, venv, .venv, and node_modules directories are skipped.""" + skill = tmp_path / "my-skill" + skill.mkdir() + for dirname in ("__pycache__", "venv", ".venv", "node_modules"): + ignored = skill / dirname + ignored.mkdir() + (ignored / "cached.py").write_text("import importlib\n") + + findings = ast_scan_skill(skill) + assert findings == [] + + def test_skips_non_existent_dir(self, tmp_path): + """Non-existent directory returns empty list.""" + findings = ast_scan_skill(Path("/nonexistent/skill/path")) + assert findings == [] + + def test_non_dir_path(self, tmp_path): + """A file path (not a directory) returns empty list.""" + f = tmp_path / "not_a_dir.py" + f.write_text("import importlib\n") + findings = ast_scan_skill(f) + assert findings == [] + + +class TestAstScanBundleFiles: + """In-memory bundle scanning for pre-install inspect diagnostics.""" + + def test_scans_python_files_from_bundle(self): + """Python files in source adapter bundle mappings are scanned.""" + findings = ast_scan_bundle_files({ + "SKILL.md": "---\nname: test\n---\n", + "scripts/run.py": "import importlib\n", + "references/readme.md": "import importlib\n", + }) + assert [f.pattern_id for f in findings] == ["ast_importlib_import"] + assert findings[0].file == "scripts/run.py" + + def test_decodes_bytes_bundle_content(self): + """Bundle file content may be bytes; decode with replacement.""" + findings = ast_scan_bundle_files({ + "scripts/run.py": b"from importlib.util import find_spec\n", + }) + assert [f.pattern_id for f in findings] == ["ast_importlib_import"] + + def test_skips_bundle_cache_dirs(self): + """Virtualenv/cache paths in a bundle are ignored.""" + findings = ast_scan_bundle_files({ + "venv/lib/run.py": "import importlib\n", + "__pycache__/cached.py": "import importlib\n", + }) + assert findings == [] + + +# --------------------------------------------------------------------------- +# Report formatting tests +# --------------------------------------------------------------------------- + + +class TestFormatAstReport: + """Rich report formatting.""" + + def test_empty_findings(self): + """Empty findings list produces a clean 'nothing found' message.""" + report = format_ast_report([]) + assert "No AST-level patterns detected" in report + + def test_empty_with_skill_name(self): + """Report with skill name but no findings.""" + report = format_ast_report([], skill_name="test-skill") + assert "test-skill" in report + assert "No AST-level patterns detected" in report + + def test_findings_grouped_by_file(self): + """Findings from the same file appear together.""" + findings = [ + AstFinding( + pattern_id="ast_importlib_import", + severity="medium", + category="obfuscation", + file="main.py", + line=1, + match="import importlib", + description="importlib imported", + ), + AstFinding( + pattern_id="ast_dynamic_import", + severity="high", + category="obfuscation", + file="main.py", + line=3, + match="importlib.import_module()", + description="dynamic import via importlib", + ), + ] + report = format_ast_report(findings) + assert "main.py" in report + assert "importlib imported" in report + assert "dynamic import via importlib" in report + assert "2 finding" in report # summary line + assert "Note: AST findings are diagnostic hints" in report + + def test_severity_summary(self): + """Report header includes severity counts.""" + findings = [ + AstFinding("id1", "high", "x", "f.py", 1, "m", "desc"), + AstFinding("id2", "high", "x", "f.py", 2, "m", "desc"), + AstFinding("id3", "medium", "x", "f.py", 3, "m", "desc"), + ] + report = format_ast_report(findings) + assert "2 high" in report + assert "1 medium" in report diff --git a/tools/skills_ast_audit.py b/tools/skills_ast_audit.py new file mode 100644 index 00000000000..dc32c687c35 --- /dev/null +++ b/tools/skills_ast_audit.py @@ -0,0 +1,353 @@ +""" +AST-level deep audit for skill Python files — opt-in diagnostic, not a security gate. + +This is a standalone diagnostic tool per SECURITY.md spirit: it helps operators +inspect skill code for patterns that *could* enable dynamic import/access +obfuscation, but it is NOT a security boundary. Every pattern flagged here has +legitimate uses. Use your judgment. + +Usage:: + + from tools.skills_ast_audit import ast_scan_skill, format_ast_report + + findings = ast_scan_skill(Path("~/.hermes/skills/some-skill")) + if findings: + print(format_ast_report(findings)) + +CLI integration: ``hermes skills audit --deep`` +""" + +from __future__ import annotations + +import ast +from dataclasses import dataclass +from pathlib import Path +from typing import Mapping, List, Optional, Union + + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + + +@dataclass +class AstFinding: + """A single finding from AST-level analysis.""" + + pattern_id: str + """Short identifier for deduplication and grouping (e.g. 'ast_importlib_import').""" + + severity: str + """One of 'high', 'medium', 'low' — for display only, not a security claim.""" + + category: str + """Grouping label — currently always 'obfuscation'.""" + + file: str + """Relative path to the file containing the finding.""" + + line: int + """1-based line number.""" + + match: str + """The matched source construct (human-readable snippet).""" + + description: str + """Why this pattern is worth reviewing.""" + + +# --------------------------------------------------------------------------- +# Scanner +# --------------------------------------------------------------------------- + +def _ast_scan_python(content: str, rel_path: str) -> List[AstFinding]: + """Detect obfuscation via dynamic imports, attribute access, and string construction. + + Hostile or pathological input (deeply-nested expressions, malformed source) + must not crash the scan. Both ``ast.parse`` and the visitor traversal are + guarded so parse/visit failures degrade gracefully to "no AST findings" + rather than raising. + """ + try: + tree = ast.parse(content) + except (SyntaxError, ValueError, RecursionError): + return [] + + findings: List[AstFinding] = [] + + class _Visitor(ast.NodeVisitor): + def visit_Call(self, node): + # Detect importlib.import_module(...) + if ( + isinstance(node.func, ast.Attribute) + and node.func.attr == "import_module" + ): + findings.append( + AstFinding( + pattern_id="ast_dynamic_import", + severity="high", + category="obfuscation", + file=rel_path, + line=node.lineno, + match="importlib.import_module()", + description="dynamic import via importlib — can load arbitrary modules at runtime", + ) + ) + # Detect __import__ with non-literal argument + if isinstance(node.func, ast.Name) and node.func.id == "__import__": + if node.args and not isinstance(node.args[0], ast.Constant): + findings.append( + AstFinding( + pattern_id="ast_dynamic_import_computed", + severity="high", + category="obfuscation", + file=rel_path, + line=node.lineno, + match="__import__()", + description="__import__ with dynamically constructed module name", + ) + ) + # Detect getattr with computed attribute name + if isinstance(node.func, ast.Name) and node.func.id == "getattr": + if len(node.args) >= 2 and not isinstance( + node.args[1], ast.Constant + ): + findings.append( + AstFinding( + pattern_id="ast_dynamic_getattr", + severity="medium", + category="obfuscation", + file=rel_path, + line=node.lineno, + match="getattr(, )", + description="getattr with dynamically constructed attribute name", + ) + ) + self.generic_visit(node) + + def visit_Subscript(self, node): + # Detect obj.__dict__[] + if ( + isinstance(node.value, ast.Attribute) + and node.value.attr == "__dict__" + ): + if not isinstance(node.slice, ast.Constant): + findings.append( + AstFinding( + pattern_id="ast_dict_access", + severity="high", + category="obfuscation", + file=rel_path, + line=node.lineno, + match="__dict__[]", + description="dynamic attribute access via __dict__ with computed key", + ) + ) + self.generic_visit(node) + + def visit_Import(self, node): + # Flag importlib and any importlib.* submodule. + for alias in node.names: + if alias.name == "importlib" or alias.name.startswith( + "importlib." + ): + findings.append( + AstFinding( + pattern_id="ast_importlib_import", + severity="medium", + category="obfuscation", + file=rel_path, + line=node.lineno, + match=f"import {alias.name}", + description="importlib imported — enables dynamic module loading", + ) + ) + self.generic_visit(node) + + def visit_ImportFrom(self, node): + module = node.module or "" + if module == "importlib" or module.startswith("importlib."): + findings.append( + AstFinding( + pattern_id="ast_importlib_import", + severity="medium", + category="obfuscation", + file=rel_path, + line=node.lineno, + match=f"from {module} import ...", + description="importlib imported — enables dynamic module loading", + ) + ) + self.generic_visit(node) + + try: + _Visitor().visit(tree) + except (RecursionError, ValueError, RuntimeError): + # Visitor traversal can fail on hostile input even when ast.parse + # succeeded (e.g. deeply-nested call/attribute chains). Return + # whatever findings we collected before the failure. + return findings + + return findings + + +def ast_scan_file(file_path: Path, rel_path: Optional[str] = None) -> List[AstFinding]: + """Scan a single Python file and return AST-level findings. + + Args: + file_path: Absolute path to the .py file. + rel_path: Relative path for display (defaults to file_path.name). + + Returns: + List of :class:`AstFinding` — empty if the file isn't Python or scan yields nothing. + """ + if file_path.suffix.lower() != ".py": + return [] + + if rel_path is None: + rel_path = file_path.name + + try: + content = file_path.read_text(encoding="utf-8", errors="replace") + except (OSError, UnicodeDecodeError): + return [] + + return _ast_scan_python(content, rel_path) + + +def ast_scan_skill(skill_path: Path) -> List[AstFinding]: + """Recursively scan all Python files in a skill directory. + + Args: + skill_path: Path to the installed skill directory. + + Returns: + Combined list of :class:`AstFinding` across all .py files. + """ + if not skill_path.is_dir(): + return [] + + all_findings: List[AstFinding] = [] + + for py_file in sorted(skill_path.rglob("*.py")): + # Skip __pycache__ and .venv/venv directories + parts = set(py_file.parent.parts) + if parts & {"__pycache__", ".venv", "venv", "node_modules"}: + continue + try: + rel = py_file.relative_to(skill_path).as_posix() + except ValueError: + rel = py_file.name + all_findings.extend(ast_scan_file(py_file, rel)) + + return all_findings + + +def ast_scan_bundle_files( + files: Mapping[str, Union[str, bytes]], +) -> List[AstFinding]: + """Scan Python files from an in-memory skill bundle. + + This powers ``hermes skills inspect --ast-deep`` so operators can review + a skill before installing it. The input is the bundle's filename -> content + mapping, as returned by the skills hub source adapters. + """ + all_findings: List[AstFinding] = [] + + for rel_path, content in sorted(files.items()): + path = Path(rel_path) + if path.suffix.lower() != ".py": + continue + if set(path.parts) & {"__pycache__", ".venv", "venv", "node_modules"}: + continue + if isinstance(content, bytes): + text = content.decode("utf-8", errors="replace") + else: + text = str(content) + all_findings.extend(_ast_scan_python(text, rel_path)) + + return all_findings + + +# --------------------------------------------------------------------------- +# Rich formatting +# --------------------------------------------------------------------------- + + +def format_ast_report( + findings: List[AstFinding], + skill_name: str = "", +) -> str: + """Format AST findings as a Rich-markup string. + + Args: + findings: List of findings from :func:`ast_scan_skill`. + skill_name: Optional skill name for the report header. + + Returns: + Rich-markup string suitable for ``console.print()``. + """ + if not findings: + header = ( + f"[bold]AST Deep Scan: {skill_name}[/]" + if skill_name + else "[bold]AST Deep Scan[/]" + ) + return f"{header}\n[dim green]No AST-level patterns detected.[/]" + + lines: List[str] = [] + severity_order = {"high": 0, "medium": 1, "low": 2} + findings_sorted = sorted( + findings, + key=lambda f: ( + severity_order.get(f.severity, 99), + f.file, + f.line, + ), + ) + + if skill_name: + lines.append(f"[bold]AST Deep Scan: {skill_name}[/]") + else: + lines.append("[bold]AST Deep Scan[/]") + + total = len(findings_sorted) + high_count = sum(1 for f in findings_sorted if f.severity == "high") + med_count = sum(1 for f in findings_sorted if f.severity == "medium") + low_count = sum(1 for f in findings_sorted if f.severity == "low") + + summary_parts = [] + if high_count: + summary_parts.append(f"[bold red]{high_count} high[/]") + if med_count: + summary_parts.append(f"[yellow]{med_count} medium[/]") + if low_count: + summary_parts.append(f"[dim]{low_count} low[/]") + lines.append( + f"[dim]{total} finding(s)[/] — " + + ", ".join(summary_parts) + if summary_parts + else f"[dim]{total} finding(s)[/]" + ) + lines.append("") + + current_file = None + for f in findings_sorted: + if f.file != current_file: + current_file = f.file + lines.append(f" [bold cyan]{f.file}[/]") + sev_color = {"high": "bold red", "medium": "yellow", "low": "dim"}.get( + f.severity, "dim" + ) + lines.append( + f" L{f.line:>4} [{sev_color}]{f.severity:6}[/] {f.description}" + ) + lines.append(f" [dim]{f.match}[/]") + + lines.append("") + lines.append( + "[dim]Note: AST findings are diagnostic hints, not security verdicts. " + "Review each pattern in context.[/]" + ) + + return "\n".join(lines)