diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 85cfc16215f..6af77204478 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -12267,11 +12267,6 @@ Examples: "inspect", help="Preview a skill without installing" ) skills_inspect.add_argument("identifier", help="Skill identifier") - skills_inspect.add_argument( - "--ast-deep", - action="store_true", - help="Run AST-level diagnostics on Python files before installing", - ) skills_list = skills_subparsers.add_parser("list", help="List installed skills") skills_list.add_argument( diff --git a/hermes_cli/skills_hub.py b/hermes_cli/skills_hub.py index 9bcc0d17f79..5d39b5202f4 100644 --- a/hermes_cli/skills_hub.py +++ b/hermes_cli/skills_hub.py @@ -630,13 +630,8 @@ def do_install(identifier: str, category: str = "", force: bool = False, c.print("[dim]Use /reset to start a new session now, or --now to activate immediately (invalidates prompt cache).[/]\n") -def do_inspect(identifier: str, console: Optional[Console] = None, - ast_deep: bool = False) -> None: - """Preview a skill's SKILL.md content without installing. - - When ``ast_deep=True``, also runs AST-level diagnostics against Python - files in the fetched bundle before installation. - """ +def do_inspect(identifier: str, console: Optional[Console] = None) -> None: + """Preview a skill's SKILL.md content without installing.""" from tools.skills_hub import GitHubAuth, create_source_router c = console or _console @@ -682,11 +677,6 @@ def do_inspect(identifier: str, console: Optional[Console] = None, preview += f"\n\n... ({len(lines) - 50} more lines)" c.print(Panel(preview, title="SKILL.md Preview", subtitle="hermes skills install to install")) - if bundle and ast_deep: - from tools.skills_ast_audit import ast_scan_bundle_files, format_ast_report - ast_findings = ast_scan_bundle_files(bundle.files) - c.print(format_ast_report(ast_findings, skill_name=meta.name)) - c.print() @@ -920,8 +910,9 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None, deep: bool = False) -> None: """Re-run security scan on installed hub skills. - When ``deep=True``, also runs AST-level analysis on Python files - (opt-in diagnostic, not a security gate). + When ``deep=True``, also runs an opt-in AST-level diagnostic on Python + files (review aid only — not a security gate; skills_guard.py verdicts + are unchanged). """ from tools.skills_hub import HubLockFile, SKILLS_DIR from tools.skills_guard import scan_skill, format_scan_report @@ -943,9 +934,8 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None, c.print(f"\n[bold]Auditing {len(targets)} skill(s)...[/]\n") - ast_scan_skill = format_ast_report = None if deep: - from tools.skills_ast_audit import ast_scan_skill, format_ast_report + from tools.skills_ast_audit import ast_scan_path, format_ast_report for entry in targets: skill_path = SKILLS_DIR / entry["install_path"] @@ -957,8 +947,7 @@ def do_audit(name: Optional[str] = None, console: Optional[Console] = None, c.print(format_scan_report(result)) if deep: - ast_findings = ast_scan_skill(skill_path) - c.print(format_ast_report(ast_findings, skill_name=entry["name"])) + c.print(format_ast_report(ast_scan_path(skill_path), skill_name=entry["name"])) c.print() @@ -1356,7 +1345,7 @@ def skills_command(args) -> None: skip_confirm=getattr(args, "yes", False), name_override=getattr(args, "name", "") or "") elif action == "inspect": - do_inspect(args.identifier, ast_deep=getattr(args, "ast_deep", False)) + do_inspect(args.identifier) elif action == "list": do_list( source_filter=args.source, @@ -1414,7 +1403,6 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None: /skills install openai/skills/skill-creator --force /skills install https://example.com/path/SKILL.md /skills inspect openai/skills/skill-creator - /skills inspect openai/skills/skill-creator --ast-deep /skills list /skills list --source hub /skills check @@ -1514,11 +1502,10 @@ def handle_skills_slash(cmd: str, console: Optional[Console] = None) -> None: name_override=name_override, console=c) elif action == "inspect": - non_flag_args = [arg for arg in args if not arg.startswith("--")] - if not non_flag_args: - c.print("[bold red]Usage:[/] /skills inspect [--ast-deep]\n") + if not args: + c.print("[bold red]Usage:[/] /skills inspect \n") return - do_inspect(non_flag_args[0], console=c, ast_deep="--ast-deep" in args) + do_inspect(args[0], console=c) elif action == "list": source_filter = "all" diff --git a/tests/tools/test_skills_ast_audit.py b/tests/tools/test_skills_ast_audit.py index fd6d502f076..c70d6a1f41c 100644 --- a/tests/tools/test_skills_ast_audit.py +++ b/tests/tools/test_skills_ast_audit.py @@ -1,299 +1,103 @@ -""" -Tests for tools.skills_ast_audit — the opt-in AST diagnostic scanner. - -These tests verify detection of dynamic import/access patterns that can -bypass line-by-line regex scanning, without crashing on hostile or -pathological input. -""" +"""Tests for tools.skills_ast_audit — opt-in AST diagnostic scanner.""" import sys from pathlib import Path -from tools.skills_ast_audit import ( - AstFinding, - ast_scan_bundle_files, - ast_scan_file, - ast_scan_skill, - format_ast_report, -) +from tools.skills_ast_audit import ast_scan_path, format_ast_report -# --------------------------------------------------------------------------- -# Core detection tests -# --------------------------------------------------------------------------- +def _pids(findings): + return [pid for (_f, _l, pid, _d) in findings] -class TestAstScanPython: - """AST scanner detects dynamic import and access patterns.""" - - def test_importlib_import_module_detected(self, tmp_path): - """importlib.import_module() calls are flagged.""" - f = tmp_path / "evil.py" - f.write_text("import importlib\nm = importlib.import_module('os')\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_dynamic_import" in pids - assert "ast_importlib_import" in pids - - def test_importlib_submodule_import_detected(self, tmp_path): - """`import importlib.util` and similar submodules are flagged.""" - f = tmp_path / "evil.py" - f.write_text("import importlib.util\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_importlib_import" in pids - - def test_importlib_submodule_aliased_import_detected(self, tmp_path): - """`import importlib.machinery as m` (aliased submodule) is flagged.""" - f = tmp_path / "evil.py" - f.write_text("import importlib.machinery as m\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_importlib_import" in pids - - def test_from_importlib_import_detected(self, tmp_path): - """`from importlib import import_module` is flagged.""" - f = tmp_path / "evil.py" - f.write_text("from importlib import import_module\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_importlib_import" in pids - - def test_from_importlib_submodule_import_detected(self, tmp_path): - """`from importlib.util import find_spec` is flagged.""" - f = tmp_path / "evil.py" - f.write_text("from importlib.util import find_spec\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_importlib_import" in pids - - def test_importer_lookalike_not_flagged(self, tmp_path): - """`import importer` must NOT match — prefix check is dot-bounded.""" - f = tmp_path / "ok.py" - f.write_text("import importer\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_importlib_import" not in pids - - def test_from_importer_lookalike_not_flagged(self, tmp_path): - """`from importer import something` must NOT match the importlib check.""" - f = tmp_path / "ok.py" - f.write_text("from importer import something\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_importlib_import" not in pids - - def test_dunder_import_with_computed_arg_detected(self, tmp_path): - """__import__ with non-literal argument is flagged.""" - f = tmp_path / "evil.py" - f.write_text("name = 'os'\nm = __import__(name)\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_dynamic_import_computed" in pids - - def test_dunder_dict_computed_key_detected(self, tmp_path): - """__dict__[] access is flagged.""" - f = tmp_path / "evil.py" - f.write_text("key = 'environ'\nval = obj.__dict__[key]\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_dict_access" in pids - - def test_getattr_with_computed_name_detected(self, tmp_path): - """getattr(obj, computed_name) is flagged.""" - f = tmp_path / "evil.py" - f.write_text("name = 'system'\nfn = getattr(os, name)\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_dynamic_getattr" in pids - - def test_syntax_error_handled_gracefully(self, tmp_path): - """Files with syntax errors should not crash the scanner.""" - f = tmp_path / "bad.py" - f.write_text("def broken(\n") - findings = ast_scan_file(f) - assert isinstance(findings, list) - - def test_literal_dunder_import_not_flagged_by_ast(self, tmp_path): - """__import__('os') with literal string is NOT flagged by AST.""" - f = tmp_path / "ok.py" - f.write_text("m = __import__('os')\n") - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_dynamic_import_computed" not in pids - - def test_full_bypass_payload_now_detected(self, tmp_path): - """The exact bypass payload from #7072 should now be caught.""" - payload = """ -import importlib -parts = ['o', 's'] -m = importlib.import_module(''.join(parts)) -e = m.__dict__[''.join(['e','n','v','i','r','o','n'])] -""" - f = tmp_path / "exfil.py" - f.write_text(payload) - findings = ast_scan_file(f) - pids = [f.pattern_id for f in findings] - assert "ast_dynamic_import" in pids - assert "ast_dict_access" in pids - assert "ast_importlib_import" in pids - - def test_non_python_files_return_empty(self, tmp_path): - """AST scan returns empty list for non-.py files.""" - f = tmp_path / "script.sh" - f.write_text("import importlib\nimportlib.import_module('os')\n") - findings = ast_scan_file(f) - assert findings == [] - - def test_scan_handles_recursion_error_gracefully(self, tmp_path): - """Deeply-nested expressions that blow the visitor recursion limit - must not crash the scan — return whatever findings were collected so far.""" - src = "a" + ".x" * 5000 + "\n" - f = tmp_path / "deep.py" - f.write_text(src) - - original_limit = sys.getrecursionlimit() - sys.setrecursionlimit(200) - try: - findings = ast_scan_file(f) - finally: - sys.setrecursionlimit(original_limit) - - assert isinstance(findings, list) +def test_bypass_payload_detected(tmp_path): + """The exact bypass shape from #7072 is caught.""" + f = tmp_path / "exfil.py" + f.write_text( + "import importlib\n" + "parts = ['o', 's']\n" + "m = importlib.import_module(''.join(parts))\n" + "e = m.__dict__[''.join(['e','n','v'])]\n" + ) + pids = _pids(ast_scan_path(f)) + assert "dynamic_import" in pids + assert "importlib_import" in pids + assert "dict_access" in pids -# --------------------------------------------------------------------------- -# Directory scanner tests -# --------------------------------------------------------------------------- +def test_syntax_error_does_not_crash(tmp_path): + f = tmp_path / "bad.py" + f.write_text("def broken(\n") + assert ast_scan_path(f) == [] -class TestAstScanSkill: - """Directory-level scanning via ast_scan_skill().""" - - def test_scans_all_py_files_in_tree(self, tmp_path): - """All .py files in a skill directory are scanned recursively.""" - skill = tmp_path / "my-skill" - skill.mkdir() - sub = skill / "subpkg" - sub.mkdir() - - (skill / "main.py").write_text("import importlib\n") - (sub / "utils.py").write_text("import importlib.util\n") - - findings = ast_scan_skill(skill) - pids = [f.pattern_id for f in findings] - # Both files should have importlib findings - assert pids.count("ast_importlib_import") == 2 - - def test_skips_ignored_dirs(self, tmp_path): - """__pycache__, venv, .venv, and node_modules directories are skipped.""" - skill = tmp_path / "my-skill" - skill.mkdir() - for dirname in ("__pycache__", "venv", ".venv", "node_modules"): - ignored = skill / dirname - ignored.mkdir() - (ignored / "cached.py").write_text("import importlib\n") - - findings = ast_scan_skill(skill) - assert findings == [] - - def test_skips_non_existent_dir(self, tmp_path): - """Non-existent directory returns empty list.""" - findings = ast_scan_skill(Path("/nonexistent/skill/path")) - assert findings == [] - - def test_non_dir_path(self, tmp_path): - """A file path (not a directory) returns empty list.""" - f = tmp_path / "not_a_dir.py" - f.write_text("import importlib\n") - findings = ast_scan_skill(f) - assert findings == [] +def test_recursion_error_does_not_crash(tmp_path): + f = tmp_path / "deep.py" + f.write_text("a" + ".x" * 5000 + "\n") + orig = sys.getrecursionlimit() + sys.setrecursionlimit(200) + try: + result = ast_scan_path(f) + finally: + sys.setrecursionlimit(orig) + assert isinstance(result, list) -class TestAstScanBundleFiles: - """In-memory bundle scanning for pre-install inspect diagnostics.""" - - def test_scans_python_files_from_bundle(self): - """Python files in source adapter bundle mappings are scanned.""" - findings = ast_scan_bundle_files({ - "SKILL.md": "---\nname: test\n---\n", - "scripts/run.py": "import importlib\n", - "references/readme.md": "import importlib\n", - }) - assert [f.pattern_id for f in findings] == ["ast_importlib_import"] - assert findings[0].file == "scripts/run.py" - - def test_decodes_bytes_bundle_content(self): - """Bundle file content may be bytes; decode with replacement.""" - findings = ast_scan_bundle_files({ - "scripts/run.py": b"from importlib.util import find_spec\n", - }) - assert [f.pattern_id for f in findings] == ["ast_importlib_import"] - - def test_skips_bundle_cache_dirs(self): - """Virtualenv/cache paths in a bundle are ignored.""" - findings = ast_scan_bundle_files({ - "venv/lib/run.py": "import importlib\n", - "__pycache__/cached.py": "import importlib\n", - }) - assert findings == [] +def test_importer_lookalike_not_flagged(tmp_path): + """`import importer` must NOT match — dot-bounded prefix.""" + f = tmp_path / "ok.py" + f.write_text("import importer\nfrom importer import x\n") + assert _pids(ast_scan_path(f)) == [] -# --------------------------------------------------------------------------- -# Report formatting tests -# --------------------------------------------------------------------------- +def test_literal_dunder_import_not_flagged(tmp_path): + """__import__('os') with a literal is not flagged (regex catches those).""" + f = tmp_path / "ok.py" + f.write_text("m = __import__('os')\n") + assert "dynamic_import_computed" not in _pids(ast_scan_path(f)) -class TestFormatAstReport: - """Rich report formatting.""" +def test_non_python_file_returns_empty(tmp_path): + f = tmp_path / "script.sh" + f.write_text("import importlib\n") + assert ast_scan_path(f) == [] - def test_empty_findings(self): - """Empty findings list produces a clean 'nothing found' message.""" - report = format_ast_report([]) - assert "No AST-level patterns detected" in report - def test_empty_with_skill_name(self): - """Report with skill name but no findings.""" - report = format_ast_report([], skill_name="test-skill") - assert "test-skill" in report - assert "No AST-level patterns detected" in report +def test_directory_scans_recursively_and_skips_cache_dirs(tmp_path): + skill = tmp_path / "s" + skill.mkdir() + (skill / "main.py").write_text("import importlib\n") + (skill / "sub").mkdir() + (skill / "sub" / "u.py").write_text("from importlib.util import find_spec\n") + for d in ("__pycache__", ".venv", "venv", "node_modules"): + ignored = skill / d + ignored.mkdir() + (ignored / "junk.py").write_text("import importlib\n") + pids = _pids(ast_scan_path(skill)) + assert pids.count("importlib_import") == 2 - def test_findings_grouped_by_file(self): - """Findings from the same file appear together.""" - findings = [ - AstFinding( - pattern_id="ast_importlib_import", - severity="medium", - category="obfuscation", - file="main.py", - line=1, - match="import importlib", - description="importlib imported", - ), - AstFinding( - pattern_id="ast_dynamic_import", - severity="high", - category="obfuscation", - file="main.py", - line=3, - match="importlib.import_module()", - description="dynamic import via importlib", - ), - ] - report = format_ast_report(findings) - assert "main.py" in report - assert "importlib imported" in report - assert "dynamic import via importlib" in report - assert "2 finding" in report # summary line - assert "Note: AST findings are diagnostic hints" in report - def test_severity_summary(self): - """Report header includes severity counts.""" - findings = [ - AstFinding("id1", "high", "x", "f.py", 1, "m", "desc"), - AstFinding("id2", "high", "x", "f.py", 2, "m", "desc"), - AstFinding("id3", "medium", "x", "f.py", 3, "m", "desc"), - ] - report = format_ast_report(findings) - assert "2 high" in report - assert "1 medium" in report +def test_missing_path_returns_empty(tmp_path): + assert ast_scan_path(tmp_path / "does_not_exist") == [] + + +def test_dynamic_getattr_and_dict_access_detected(tmp_path): + f = tmp_path / "g.py" + f.write_text("name = 'x'\nv = getattr(o, name)\nv = o.__dict__[name]\n") + pids = _pids(ast_scan_path(f)) + assert "dynamic_getattr" in pids + assert "dict_access" in pids + + +def test_format_report_empty(): + assert "No dynamic" in format_ast_report([]) + + +def test_format_report_with_findings(): + findings = [ + ("a.py", 1, "importlib_import", "import importlib — ..."), + ("a.py", 3, "dynamic_import", "importlib.import_module() — ..."), + ] + out = format_ast_report(findings, skill_name="test") + assert "test" in out and "a.py" in out and "L1" in out and "L3" in out + assert "diagnostic hints" in out diff --git a/tools/skills_ast_audit.py b/tools/skills_ast_audit.py index dc32c687c35..e127556c1d9 100644 --- a/tools/skills_ast_audit.py +++ b/tools/skills_ast_audit.py @@ -1,353 +1,133 @@ """ AST-level deep audit for skill Python files — opt-in diagnostic, not a security gate. -This is a standalone diagnostic tool per SECURITY.md spirit: it helps operators -inspect skill code for patterns that *could* enable dynamic import/access -obfuscation, but it is NOT a security boundary. Every pattern flagged here has -legitimate uses. Use your judgment. +Per SECURITY.md §2.4, Skills Guard is in-process heuristics ("useful — not +boundaries"). This module is a separate opt-in diagnostic that flags dynamic +import / dynamic attribute access patterns operators may want to eyeball when +reviewing third-party skill code. Every pattern flagged here has legitimate +uses; findings are hints for human review, not verdicts. -Usage:: - - from tools.skills_ast_audit import ast_scan_skill, format_ast_report - - findings = ast_scan_skill(Path("~/.hermes/skills/some-skill")) - if findings: - print(format_ast_report(findings)) - -CLI integration: ``hermes skills audit --deep`` +CLI: ``hermes skills audit --deep`` """ from __future__ import annotations import ast -from dataclasses import dataclass from pathlib import Path -from typing import Mapping, List, Optional, Union +from typing import List, Tuple + +# (file, line, pattern_id, description) +Finding = Tuple[str, int, str, str] + +_IGNORED_DIRS = {"__pycache__", ".venv", "venv", "node_modules"} -# --------------------------------------------------------------------------- -# Data model -# --------------------------------------------------------------------------- - - -@dataclass -class AstFinding: - """A single finding from AST-level analysis.""" - - pattern_id: str - """Short identifier for deduplication and grouping (e.g. 'ast_importlib_import').""" - - severity: str - """One of 'high', 'medium', 'low' — for display only, not a security claim.""" - - category: str - """Grouping label — currently always 'obfuscation'.""" - - file: str - """Relative path to the file containing the finding.""" - - line: int - """1-based line number.""" - - match: str - """The matched source construct (human-readable snippet).""" - - description: str - """Why this pattern is worth reviewing.""" - - -# --------------------------------------------------------------------------- -# Scanner -# --------------------------------------------------------------------------- - -def _ast_scan_python(content: str, rel_path: str) -> List[AstFinding]: - """Detect obfuscation via dynamic imports, attribute access, and string construction. - - Hostile or pathological input (deeply-nested expressions, malformed source) - must not crash the scan. Both ``ast.parse`` and the visitor traversal are - guarded so parse/visit failures degrade gracefully to "no AST findings" - rather than raising. - """ +def _scan_source(content: str, rel_path: str) -> List[Finding]: try: tree = ast.parse(content) except (SyntaxError, ValueError, RecursionError): return [] - findings: List[AstFinding] = [] + findings: List[Finding] = [] - class _Visitor(ast.NodeVisitor): + class V(ast.NodeVisitor): def visit_Call(self, node): - # Detect importlib.import_module(...) - if ( - isinstance(node.func, ast.Attribute) - and node.func.attr == "import_module" - ): - findings.append( - AstFinding( - pattern_id="ast_dynamic_import", - severity="high", - category="obfuscation", - file=rel_path, - line=node.lineno, - match="importlib.import_module()", - description="dynamic import via importlib — can load arbitrary modules at runtime", - ) - ) - # Detect __import__ with non-literal argument - if isinstance(node.func, ast.Name) and node.func.id == "__import__": + f = node.func + # importlib.import_module(...) + if isinstance(f, ast.Attribute) and f.attr == "import_module": + findings.append((rel_path, node.lineno, "dynamic_import", + "importlib.import_module() — loads arbitrary modules at runtime")) + # __import__() + elif isinstance(f, ast.Name) and f.id == "__import__": if node.args and not isinstance(node.args[0], ast.Constant): - findings.append( - AstFinding( - pattern_id="ast_dynamic_import_computed", - severity="high", - category="obfuscation", - file=rel_path, - line=node.lineno, - match="__import__()", - description="__import__ with dynamically constructed module name", - ) - ) - # Detect getattr with computed attribute name - if isinstance(node.func, ast.Name) and node.func.id == "getattr": - if len(node.args) >= 2 and not isinstance( - node.args[1], ast.Constant - ): - findings.append( - AstFinding( - pattern_id="ast_dynamic_getattr", - severity="medium", - category="obfuscation", - file=rel_path, - line=node.lineno, - match="getattr(, )", - description="getattr with dynamically constructed attribute name", - ) - ) + findings.append((rel_path, node.lineno, "dynamic_import_computed", + "__import__ with non-literal module name")) + # getattr(obj, ) + elif isinstance(f, ast.Name) and f.id == "getattr": + if len(node.args) >= 2 and not isinstance(node.args[1], ast.Constant): + findings.append((rel_path, node.lineno, "dynamic_getattr", + "getattr with non-literal attribute name")) self.generic_visit(node) def visit_Subscript(self, node): - # Detect obj.__dict__[] - if ( - isinstance(node.value, ast.Attribute) - and node.value.attr == "__dict__" - ): - if not isinstance(node.slice, ast.Constant): - findings.append( - AstFinding( - pattern_id="ast_dict_access", - severity="high", - category="obfuscation", - file=rel_path, - line=node.lineno, - match="__dict__[]", - description="dynamic attribute access via __dict__ with computed key", - ) - ) + # obj.__dict__[] + if (isinstance(node.value, ast.Attribute) + and node.value.attr == "__dict__" + and not isinstance(node.slice, ast.Constant)): + findings.append((rel_path, node.lineno, "dict_access", + "__dict__[] — dynamic attribute access")) self.generic_visit(node) def visit_Import(self, node): - # Flag importlib and any importlib.* submodule. - for alias in node.names: - if alias.name == "importlib" or alias.name.startswith( - "importlib." - ): - findings.append( - AstFinding( - pattern_id="ast_importlib_import", - severity="medium", - category="obfuscation", - file=rel_path, - line=node.lineno, - match=f"import {alias.name}", - description="importlib imported — enables dynamic module loading", - ) - ) + for a in node.names: + if a.name == "importlib" or a.name.startswith("importlib."): + findings.append((rel_path, node.lineno, "importlib_import", + f"import {a.name} — enables dynamic module loading")) self.generic_visit(node) def visit_ImportFrom(self, node): - module = node.module or "" - if module == "importlib" or module.startswith("importlib."): - findings.append( - AstFinding( - pattern_id="ast_importlib_import", - severity="medium", - category="obfuscation", - file=rel_path, - line=node.lineno, - match=f"from {module} import ...", - description="importlib imported — enables dynamic module loading", - ) - ) + m = node.module or "" + if m == "importlib" or m.startswith("importlib."): + findings.append((rel_path, node.lineno, "importlib_import", + f"from {m} import ... — enables dynamic module loading")) self.generic_visit(node) try: - _Visitor().visit(tree) + V().visit(tree) except (RecursionError, ValueError, RuntimeError): - # Visitor traversal can fail on hostile input even when ast.parse - # succeeded (e.g. deeply-nested call/attribute chains). Return - # whatever findings we collected before the failure. - return findings + # Hostile/pathological input: return what we collected so far. + pass return findings -def ast_scan_file(file_path: Path, rel_path: Optional[str] = None) -> List[AstFinding]: - """Scan a single Python file and return AST-level findings. +def ast_scan_path(path: Path) -> List[Finding]: + """Scan a single .py file or recursively scan all .py under a directory. - Args: - file_path: Absolute path to the .py file. - rel_path: Relative path for display (defaults to file_path.name). - - Returns: - List of :class:`AstFinding` — empty if the file isn't Python or scan yields nothing. + Returns a list of (file, line, pattern_id, description) tuples. Empty for + non-Python paths, missing paths, or paths with no matching patterns. """ - if file_path.suffix.lower() != ".py": + if path.is_file(): + if path.suffix.lower() != ".py": + return [] + try: + content = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return [] + return _scan_source(content, path.name) + + if not path.is_dir(): return [] - if rel_path is None: - rel_path = file_path.name - - try: - content = file_path.read_text(encoding="utf-8", errors="replace") - except (OSError, UnicodeDecodeError): - return [] - - return _ast_scan_python(content, rel_path) - - -def ast_scan_skill(skill_path: Path) -> List[AstFinding]: - """Recursively scan all Python files in a skill directory. - - Args: - skill_path: Path to the installed skill directory. - - Returns: - Combined list of :class:`AstFinding` across all .py files. - """ - if not skill_path.is_dir(): - return [] - - all_findings: List[AstFinding] = [] - - for py_file in sorted(skill_path.rglob("*.py")): - # Skip __pycache__ and .venv/venv directories - parts = set(py_file.parent.parts) - if parts & {"__pycache__", ".venv", "venv", "node_modules"}: + out: List[Finding] = [] + for py in sorted(path.rglob("*.py")): + if set(py.parent.parts) & _IGNORED_DIRS: continue try: - rel = py_file.relative_to(skill_path).as_posix() + content = py.read_text(encoding="utf-8", errors="replace") + except OSError: + continue + try: + rel = py.relative_to(path).as_posix() except ValueError: - rel = py_file.name - all_findings.extend(ast_scan_file(py_file, rel)) - - return all_findings + rel = py.name + out.extend(_scan_source(content, rel)) + return out -def ast_scan_bundle_files( - files: Mapping[str, Union[str, bytes]], -) -> List[AstFinding]: - """Scan Python files from an in-memory skill bundle. - - This powers ``hermes skills inspect --ast-deep`` so operators can review - a skill before installing it. The input is the bundle's filename -> content - mapping, as returned by the skills hub source adapters. - """ - all_findings: List[AstFinding] = [] - - for rel_path, content in sorted(files.items()): - path = Path(rel_path) - if path.suffix.lower() != ".py": - continue - if set(path.parts) & {"__pycache__", ".venv", "venv", "node_modules"}: - continue - if isinstance(content, bytes): - text = content.decode("utf-8", errors="replace") - else: - text = str(content) - all_findings.extend(_ast_scan_python(text, rel_path)) - - return all_findings - - -# --------------------------------------------------------------------------- -# Rich formatting -# --------------------------------------------------------------------------- - - -def format_ast_report( - findings: List[AstFinding], - skill_name: str = "", -) -> str: - """Format AST findings as a Rich-markup string. - - Args: - findings: List of findings from :func:`ast_scan_skill`. - skill_name: Optional skill name for the report header. - - Returns: - Rich-markup string suitable for ``console.print()``. - """ +def format_ast_report(findings: List[Finding], skill_name: str = "") -> str: + """Plain-text report (Rich-markup-free) grouped by file.""" + header = f"AST deep scan: {skill_name}" if skill_name else "AST deep scan" if not findings: - header = ( - f"[bold]AST Deep Scan: {skill_name}[/]" - if skill_name - else "[bold]AST Deep Scan[/]" - ) - return f"{header}\n[dim green]No AST-level patterns detected.[/]" + return f"{header}\n No dynamic import/access patterns detected." - lines: List[str] = [] - severity_order = {"high": 0, "medium": 1, "low": 2} - findings_sorted = sorted( - findings, - key=lambda f: ( - severity_order.get(f.severity, 99), - f.file, - f.line, - ), - ) - - if skill_name: - lines.append(f"[bold]AST Deep Scan: {skill_name}[/]") - else: - lines.append("[bold]AST Deep Scan[/]") - - total = len(findings_sorted) - high_count = sum(1 for f in findings_sorted if f.severity == "high") - med_count = sum(1 for f in findings_sorted if f.severity == "medium") - low_count = sum(1 for f in findings_sorted if f.severity == "low") - - summary_parts = [] - if high_count: - summary_parts.append(f"[bold red]{high_count} high[/]") - if med_count: - summary_parts.append(f"[yellow]{med_count} medium[/]") - if low_count: - summary_parts.append(f"[dim]{low_count} low[/]") - lines.append( - f"[dim]{total} finding(s)[/] — " - + ", ".join(summary_parts) - if summary_parts - else f"[dim]{total} finding(s)[/]" - ) + lines = [header, f" {len(findings)} finding(s):"] + current = None + for f, line, pid, desc in sorted(findings): + if f != current: + current = f + lines.append(f" {f}") + lines.append(f" L{line} {pid} — {desc}") lines.append("") - - current_file = None - for f in findings_sorted: - if f.file != current_file: - current_file = f.file - lines.append(f" [bold cyan]{f.file}[/]") - sev_color = {"high": "bold red", "medium": "yellow", "low": "dim"}.get( - f.severity, "dim" - ) - lines.append( - f" L{f.line:>4} [{sev_color}]{f.severity:6}[/] {f.description}" - ) - lines.append(f" [dim]{f.match}[/]") - - lines.append("") - lines.append( - "[dim]Note: AST findings are diagnostic hints, not security verdicts. " - "Review each pattern in context.[/]" - ) - + lines.append(" Note: diagnostic hints for human review, not security verdicts.") return "\n".join(lines)