From 8bdc3bc971578d86d13571aeb859a521803e80cf Mon Sep 17 00:00:00 2001 From: Zied Jlassi <6190550+zied-jlassi@users.noreply.github.com> Date: Tue, 23 Jun 2026 21:20:39 +0200 Subject: [PATCH 1/2] fix(behavioral): detect builtins.* and importlib.import_module sink evasions The import-alias normalization rewrites `from builtins import exec` and `import builtins; builtins.exec(...)` to the qualified `builtins.exec`, which the bare-name sink checks (call_name == "exec", _EXEC_SINKS) then miss. Collapse `builtins.` back to the bare builtin so these re-enter the existing detection, covering exec/eval/compile/__import__. Add a dynamic-import resolver so `importlib.import_module('os').system(...)` (sibling of __import__) resolves to os.system / the subprocess family and hits the sink ladders. Baseline detection unchanged; composes with the getattr branch (#166) with no double-fire; FP-neighbors stay clean. Signed-off-by: Zied Jlassi <6190550+zied-jlassi@users.noreply.github.com> --- .../nodes/analyzers/behavioral_ast.py | 5 ++ .../analyzers/behavioral_taint_tracking.py | 21 ++++- src/skillspector/nodes/analyzers/common.py | 75 +++++++++++++++- tests/nodes/analyzers/test_behavioral_ast.py | 89 +++++++++++++++++++ .../test_behavioral_taint_tracking.py | 59 ++++++++++++ 5 files changed, 244 insertions(+), 5 deletions(-) diff --git a/src/skillspector/nodes/analyzers/behavioral_ast.py b/src/skillspector/nodes/analyzers/behavioral_ast.py index a502e8e..d8898f4 100644 --- a/src/skillspector/nodes/analyzers/behavioral_ast.py +++ b/src/skillspector/nodes/analyzers/behavioral_ast.py @@ -28,6 +28,7 @@ get_context_from_lines, get_source_segment, resolve_call_name, + resolve_dynamic_import_call, ) from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding @@ -182,6 +183,10 @@ def _emit( continue call_name = resolve_call_name(ast_node, aliases) + if call_name is None: + # Dynamic-import chain: importlib.import_module('os').system(...) → + # 'os.system', so it re-enters the os./subprocess. sink ladders below. + call_name = resolve_dynamic_import_call(ast_node, aliases) if call_name is None: continue diff --git a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py index 83eff64..4cbcc6e 100644 --- a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py +++ b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py @@ -37,6 +37,7 @@ get_source_segment, resolve_call_name_typed, resolve_dotted_name, + resolve_dynamic_import_call, ) from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding @@ -170,6 +171,24 @@ ] +def _resolve_sink_name( + node: ast.Call, + type_map: dict[str, str] | None = None, + aliases: dict[str, str] | None = None, +) -> str | None: + """Resolve a call to its canonical sink name, including dynamic-import chains. + + Wraps :func:`resolve_call_name_typed` (type-/alias-aware resolution) and falls back + to :func:`resolve_dynamic_import_call` so that + ``importlib.import_module('subprocess').run(...)`` resolves to ``'subprocess.run'`` + and re-enters ``_EXEC_SINKS`` like the statically-imported form would. + """ + name = resolve_call_name_typed(node, type_map, aliases) + if name is None: + name = resolve_dynamic_import_call(node, aliases) + return name + + def _classify(name: str, categories: list[tuple[frozenset[str], str]], default: str) -> str: for names, label in categories: if name in names: @@ -363,7 +382,7 @@ def _emit( if not isinstance(ast_node, ast.Call): continue - sink_name = resolve_call_name_typed(ast_node, type_map, aliases) + sink_name = _resolve_sink_name(ast_node, type_map, aliases) if not sink_name or sink_name not in _ALL_SINKS: continue diff --git a/src/skillspector/nodes/analyzers/common.py b/src/skillspector/nodes/analyzers/common.py index 3fed091..22bde49 100644 --- a/src/skillspector/nodes/analyzers/common.py +++ b/src/skillspector/nodes/analyzers/common.py @@ -104,24 +104,47 @@ def resolve_dotted_name(node: ast.expr) -> str | None: return None +def _strip_builtins_prefix(name: str) -> str: + """Collapse a ``builtins``-qualified name back to its bare builtin name. + + ``builtins.exec`` → ``exec`` (and ``builtins.eval``/``compile``/``__import__``…). + The analyzers match dangerous builtins by their bare name (``call_name == "exec"``, + ``name in _EXEC_SINKS``), but ``from builtins import exec`` / ``import builtins; + builtins.exec(...)`` resolve, through the import-alias map, to the *qualified* + spelling ``builtins.exec`` — which would otherwise slip past those checks. Since + ``builtins.exec is exec`` at runtime, collapsing the prefix is semantically exact + and re-enters the existing bare-name detection. + + Only the single-segment form ``builtins.`` is collapsed; deeper chains + (``builtins.foo.bar``) are left untouched as they are not direct builtin calls. + """ + root, sep, rest = name.partition(".") + if root == "builtins" and sep and "." not in rest: + return rest + return name + + def apply_import_aliases(name: str, aliases: dict[str, str]) -> str: """Rewrite a resolved call name to its fully-qualified form using import aliases. - Bridges two evasion-prone spellings back to the canonical dotted name that the + Bridges several evasion-prone spellings back to the canonical name that the analyzers match against: - ``from os import system`` → ``{"system": "os.system"}`` so a bare ``system`` call resolves to ``"os.system"``. - ``import os as o`` → ``{"o": "os"}`` so ``o.system`` resolves to ``"os.system"``. + - ``from builtins import exec`` / ``import builtins; builtins.exec(...)`` → the + bare builtin ``exec`` (via :func:`_strip_builtins_prefix`), so dangerous + builtins matched by bare name are not hidden behind a ``builtins.`` qualifier. Idempotent for already-canonical names (``os.system`` stays ``os.system``). """ if name in aliases: - return aliases[name] + return _strip_builtins_prefix(aliases[name]) root, sep, rest = name.partition(".") if sep and root in aliases: - return f"{aliases[root]}.{rest}" - return name + return _strip_builtins_prefix(f"{aliases[root]}.{rest}") + return _strip_builtins_prefix(name) def resolve_call_name(node: ast.Call, aliases: dict[str, str] | None = None) -> str | None: @@ -138,6 +161,50 @@ def resolve_call_name(node: ast.Call, aliases: dict[str, str] | None = None) -> return name +def _dynamic_import_target(node: ast.expr, aliases: dict[str, str] | None = None) -> str | None: + """Return the imported module name for an ``importlib.import_module('mod')`` call. + + Recognizes both ``importlib.import_module('os')`` and the bare-imported + ``from importlib import import_module; import_module('os')`` (resolved via the + import-alias map), returning the string literal module name (``'os'``) when the + first positional argument is a constant. Returns ``None`` for anything else + (non-literal argument, unrelated call), so callers stay precise and avoid false + positives on dynamic module names the analyzer cannot resolve statically. + """ + if not isinstance(node, ast.Call): + return None + func_name = resolve_dotted_name(node.func) + if func_name is not None and aliases: + func_name = apply_import_aliases(func_name, aliases) + if func_name not in ("importlib.import_module", "import_module"): + return None + if node.args and isinstance(node.args[0], ast.Constant) and isinstance(node.args[0].value, str): + return node.args[0].value + return None + + +def resolve_dynamic_import_call( + node: ast.Call, aliases: dict[str, str] | None = None +) -> str | None: + """Resolve ``importlib.import_module('mod').attr(...)`` to the dotted sink ``'mod.attr'``. + + Bridges the dynamic-import evasion that mirrors ``__import__``: a skill writes + ``importlib.import_module('os').system(cmd)`` (or imports ``import_module`` bare) + so the dangerous module never appears as a static ``import``. When *node*'s callee + is an attribute access on such a chain, this returns the canonical sink name + (``'os.system'``, ``'subprocess.run'``) that the existing sink ladders already + match. Returns ``None`` when the chain is not a literal dynamic import, keeping the + resolution precise (no false positives on un-resolvable dynamic names). + """ + func = node.func + if not isinstance(func, ast.Attribute): + return None + module_name = _dynamic_import_target(func.value, aliases) + if module_name is None: + return None + return f"{module_name}.{func.attr}" + + def _build_import_aliases(tree: ast.Module) -> dict[str, str]: """Map locally imported names to their fully-qualified module paths. diff --git a/tests/nodes/analyzers/test_behavioral_ast.py b/tests/nodes/analyzers/test_behavioral_ast.py index 996fa1d..ae1a423 100644 --- a/tests/nodes/analyzers/test_behavioral_ast.py +++ b/tests/nodes/analyzers/test_behavioral_ast.py @@ -284,3 +284,92 @@ def test_multiple_dangerous_calls_in_one_file(self): assert "AST2" in rule_ids assert "AST4" in rule_ids assert "AST5" in rule_ids + + +# ── builtins / importlib import-chain evasion ───────────────────────── + + +class TestBuiltinsImportEvasion: + """Dangerous builtins hidden behind the ``builtins`` module must still alert. + + The analyzer matches dangerous builtins by their bare name (``exec``/``eval``/ + ``compile``/``__import__``). Writing ``from builtins import exec`` or + ``import builtins; builtins.exec(...)`` resolves, through the import-alias map, + to the qualified spelling ``builtins.exec`` — which would slip past the bare-name + checks unless it is canonicalized back. Since ``builtins.exec is exec``, the + collapse is semantically exact. Complements the ``getattr`` branch (PR #166). + """ + + def test_from_builtins_import_exec(self): + """``from builtins import exec; exec(code)`` must still raise AST1.""" + findings = _run("from builtins import exec\nexec('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_from_builtins_import_eval(self): + """``from builtins import eval`` must still raise AST2.""" + findings = _run("from builtins import eval\neval('2 + 2')\n") + assert any(f.rule_id == "AST2" for f in findings) + + def test_from_builtins_import_compile(self): + """``from builtins import compile`` must still raise AST6.""" + findings = _run("from builtins import compile\ncompile('x', '', 'exec')\n") + assert any(f.rule_id == "AST6" for f in findings) + + def test_from_builtins_import_dunder_import(self): + """``from builtins import __import__`` must still raise AST3.""" + findings = _run("from builtins import __import__\n__import__('os')\n") + assert any(f.rule_id == "AST3" for f in findings) + + def test_import_builtins_dot_exec(self): + """``import builtins; builtins.exec(...)`` must still raise AST1.""" + findings = _run("import builtins\nbuiltins.exec('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_import_builtins_as_alias_dot_exec(self): + """``import builtins as b2; b2.exec(...)`` must still raise AST1.""" + findings = _run("import builtins as b2\nb2.exec('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_from_builtins_import_exec_as_alias(self): + """``from builtins import exec as e; e(...)`` must still raise AST1.""" + findings = _run("from builtins import exec as e\ne('x = 1')\n") + assert any(f.rule_id == "AST1" for f in findings) + + def test_user_module_exec_helper_no_false_positive(self): + """A benign helper merely *named* like a sink must not match (FP-neighbor). + + ``from mymod import exec_helper; exec_helper()`` imports an unrelated + third-party callable — it is not ``builtins.exec`` and must stay clean. + """ + findings = _run("from mymod import exec_helper\nexec_helper()\n") + assert findings == [] + + +class TestImportlibDynamicChainEvasion: + """``importlib.import_module('mod').attr(...)`` is a dynamic-import sink chain. + + It mirrors ``__import__('mod')`` but lets the dangerous module name live in a + string literal so it never appears as a static ``import``. The chain is resolved + to the canonical dotted sink (``os.system``/``subprocess.run``) so it re-enters + the existing ``os.``/``subprocess.`` sink ladders. + """ + + def test_importlib_import_module_os_system(self): + """``importlib.import_module('os').system(...)`` must raise AST5.""" + findings = _run("import importlib\nimportlib.import_module('os').system('id')\n") + assert any(f.rule_id == "AST5" for f in findings) + + def test_importlib_import_module_subprocess_run(self): + """``importlib.import_module('subprocess').run(...)`` must raise AST4.""" + findings = _run("import importlib\nimportlib.import_module('subprocess').run(['id'])\n") + assert any(f.rule_id == "AST4" for f in findings) + + def test_from_importlib_import_module_os_system(self): + """Bare-imported ``import_module('os').system(...)`` must raise AST5.""" + findings = _run("from importlib import import_module\nimport_module('os').system('id')\n") + assert any(f.rule_id == "AST5" for f in findings) + + def test_importlib_import_module_benign_no_false_positive(self): + """A benign dynamic import (``json.loads``) must not match a sink ladder.""" + findings = _run("import importlib\nimportlib.import_module('json').loads('{}')\n") + assert findings == [] diff --git a/tests/nodes/analyzers/test_behavioral_taint_tracking.py b/tests/nodes/analyzers/test_behavioral_taint_tracking.py index 791ca90..022546c 100644 --- a/tests/nodes/analyzers/test_behavioral_taint_tracking.py +++ b/tests/nodes/analyzers/test_behavioral_taint_tracking.py @@ -464,3 +464,62 @@ def test_untyped_variable_no_false_positive(self): ) findings = _run(code) assert not any(f.rule_id == "TT4" for f in findings) + + +# ── builtins / importlib exec-sink evasion ──────────────────────────── + + +class TestBuiltinsImportlibSinkEvasion: + """Exec sinks reached via ``builtins.*`` or ``importlib.import_module`` must alert. + + ``_EXEC_SINKS`` matches by bare/qualified name (``"exec"``, ``"os.system"``). + ``from builtins import exec`` resolves to ``builtins.exec`` (collapsed back to + ``exec``) and ``importlib.import_module('subprocess').run`` resolves to the + canonical ``subprocess.run`` — both must re-enter the exec-sink path so a + user-input → exec flow is flagged as TT5. Complements the ``getattr`` branch + (PR #166): this covers the import/builtins/importlib branch. + """ + + def test_from_builtins_import_exec_sink(self): + """``from builtins import exec`` with tainted input must raise TT5.""" + code = "from builtins import exec\ncode = input()\nexec(code)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_import_builtins_dot_exec_sink(self): + """``import builtins; builtins.exec(input())`` must raise TT5.""" + code = "import builtins\ncode = input()\nbuiltins.exec(code)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_import_builtins_as_alias_sink(self): + """``import builtins as b2; b2.exec(input())`` must raise TT5.""" + code = "import builtins as b2\ncode = input()\nb2.exec(code)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_importlib_import_module_os_system_sink(self): + """``importlib.import_module('os').system(input())`` must raise TT5.""" + code = "import importlib\ncmd = input()\nimportlib.import_module('os').system(cmd)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_importlib_import_module_subprocess_run_sink(self): + """``importlib.import_module('subprocess').run(input())`` must raise TT5.""" + code = "import importlib\ncmd = input()\nimportlib.import_module('subprocess').run(cmd)\n" + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_from_importlib_import_module_sink(self): + """Bare-imported ``import_module('os').system(input())`` must raise TT5.""" + code = ( + "from importlib import import_module\ncmd = input()\nimport_module('os').system(cmd)\n" + ) + findings = _run(code) + assert any(f.rule_id == "TT5" for f in findings) + + def test_importlib_benign_module_no_false_positive(self): + """A benign dynamic import (``json.loads``) must not be treated as an exec sink.""" + code = "import importlib\ndata = input()\nimportlib.import_module('json').loads(data)\n" + findings = _run(code) + assert not any(f.rule_id == "TT5" for f in findings) From ae6c9f12105da0da85599ff45edc9ccba77f3dff Mon Sep 17 00:00:00 2001 From: Zied Jlassi <6190550+zied-jlassi@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:41:13 +0200 Subject: [PATCH 2/2] feat(behavioral): canonical sink resolver + shared evasion corpus fitness gate Introduce resolve_to_canonical_sink, a single chokepoint reducing every spelling of a dangerous callable -- bare/alias/from/as, builtins.*, importlib.import_module, reflective getattr("lit"), subscript __builtins__["exec"] and .__dict__["lit"], plus importlib/runpy/code sibling machinery -- to one canonical sink id. Wire it into behavioral_ast and behavioral_taint_tracking so the per-idiom ladders stay simple, with a shared parametrized evasion corpus asserting the invariant as a fitness gate: a future missed spelling fails a shared test instead of shipping as a silent blind spot. Reflective subprocess invocations grade AST9-HIGH for parity with reflective os.system; direct subprocess.* keeps baseline AST4-MEDIUM; AST9 and the canonical fallback stay complementary (the _covered_by_ast9 guard prevents double-firing). Benign neighbours never canonicalize to a sink. Stacked on the builtins/importlib sink-evasion fix (reuses resolve_dynamic_import_call); to be rebased onto main once that lands. 108 evasion-corpus tests pass; full suite +108 with zero regressions (ruff 0.15.2 + mypy clean). Signed-off-by: Zied Jlassi <6190550+zied-jlassi@users.noreply.github.com> --- .../nodes/analyzers/behavioral_ast.py | 63 +++- .../analyzers/behavioral_taint_tracking.py | 18 +- .../nodes/analyzers/canonical_sink.py | 289 ++++++++++++++++++ tests/evasion_corpus/__init__.py | 14 + tests/evasion_corpus/corpus.py | 149 +++++++++ .../test_canonical_sink_fitness.py | 209 +++++++++++++ 6 files changed, 727 insertions(+), 15 deletions(-) create mode 100644 src/skillspector/nodes/analyzers/canonical_sink.py create mode 100644 tests/evasion_corpus/__init__.py create mode 100644 tests/evasion_corpus/corpus.py create mode 100644 tests/evasion_corpus/test_canonical_sink_fitness.py diff --git a/src/skillspector/nodes/analyzers/behavioral_ast.py b/src/skillspector/nodes/analyzers/behavioral_ast.py index d8898f4..0c4d076 100644 --- a/src/skillspector/nodes/analyzers/behavioral_ast.py +++ b/src/skillspector/nodes/analyzers/behavioral_ast.py @@ -23,8 +23,14 @@ from skillspector.models import AnalyzerFinding, Finding, Location, Severity from skillspector.state import AnalyzerNodeResponse, SkillspectorState +from .canonical_sink import ( + canonical_sibling_method, + canonical_sibling_sink, + resolve_to_canonical_sink, +) from .common import ( build_import_aliases, + build_type_map, get_context_from_lines, get_source_segment, resolve_call_name, @@ -123,6 +129,29 @@ _TAG = "Dangerous Code Execution" +def _covered_by_ast9(node: ast.Call) -> bool: + """True when *node* invokes a literal ``getattr`` already flagged by AST9. + + AST9 catches ``getattr(obj, "")(...)`` for ```` in + :data:`_DANGEROUS_GETATTR_NAMES`. The canonical-sink fallback skips exactly those so + the two rules stay complementary rather than double-firing: the canonical layer then + owns only the spellings AST9 does not (subscript ``__builtins__["exec"]`` / + ``vars(builtins)["exec"]`` and getattr targets outside the allowlist such as + ``getattr(subprocess, "Popen")``). + """ + callee = node.func + if not (isinstance(callee, ast.Call) and isinstance(callee.func, ast.Name)): + return False + if callee.func.id != "getattr" or len(callee.args) < 2: + return False + attr = callee.args[1] + return ( + isinstance(attr, ast.Constant) + and isinstance(attr.value, str) + and attr.value in _DANGEROUS_GETATTR_NAMES + ) + + def _is_chain_sink(node: ast.Call, aliases: dict[str, str] | None = None) -> bool: """True if this call is exec(), eval(), or compile() — the outer dangerous call.""" name = resolve_call_name(node, aliases) @@ -156,6 +185,7 @@ def _analyze_python(content: str, file_path: str) -> list[AnalyzerFinding]: return [] aliases = build_import_aliases(tree) + type_map = build_type_map(tree) lines = content.splitlines() findings: list[AnalyzerFinding] = [] @@ -183,10 +213,32 @@ def _emit( continue call_name = resolve_call_name(ast_node, aliases) + if call_name is not None: + # Dynamic-import / code-exec sibling machinery (importlib.__import__, + # importlib.util.find_spec, runpy.run_module, code.interact, and the + # instance-method tails spec.loader.exec_module / runsource) resolves by name + # but matches no ladder; remap it to the primitive it equals so it re-enters + # the __import__/exec arms below. + call_name = ( + canonical_sibling_sink(call_name) + or canonical_sibling_method(ast_node, type_map) + or call_name + ) if call_name is None: # Dynamic-import chain: importlib.import_module('os').system(...) → # 'os.system', so it re-enters the os./subprocess. sink ladders below. call_name = resolve_dynamic_import_call(ast_node, aliases) + reflective = False + if call_name is None and not _covered_by_ast9(ast_node): + # Reflective invocation whose callee does not resolve by name: + # getattr(subprocess, "Popen")(cmd) (outside AST9's allowlist), + # __builtins__["exec"](src), vars(builtins)["exec"](src). Canonicalize to + # the bare/dotted sink id so it re-enters the ladders below with the correct + # rule + severity. Literal getattr() on an AST9-allowlisted name is left to + # AST9 (see _covered_by_ast9) so the two rules stay complementary, not + # duplicate. + call_name = resolve_to_canonical_sink(ast_node, aliases) + reflective = call_name is not None if call_name is None: continue @@ -216,7 +268,11 @@ def _emit( elif call_name.startswith("subprocess."): attr = call_name.split(".", 1)[1] if attr in _SUBPROCESS_CALLS: - _emit("AST4", lineno, end_lineno) + # A *reflective* subprocess invocation (getattr/subscript) signals the + # same evasion intent as reflective os.system, so it grades AST9-HIGH to + # stay consistent with that case; a direct subprocess.* call keeps its + # baseline AST4-MEDIUM. + _emit("AST9" if reflective else "AST4", lineno, end_lineno) elif call_name.startswith("os."): attr = call_name.split(".", 1)[1] @@ -227,10 +283,7 @@ def _emit( second_arg = ast_node.args[1] if not isinstance(second_arg, ast.Constant): _emit("AST7", lineno, end_lineno) - elif ( - isinstance(second_arg.value, str) - and second_arg.value in _DANGEROUS_GETATTR_NAMES - ): + elif isinstance(second_arg.value, str) and second_arg.value in _DANGEROUS_GETATTR_NAMES: _emit("AST9", lineno, end_lineno) return findings diff --git a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py index 4cbcc6e..4426b51 100644 --- a/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py +++ b/src/skillspector/nodes/analyzers/behavioral_taint_tracking.py @@ -29,6 +29,7 @@ from skillspector.models import AnalyzerFinding, Finding, Location, Severity from skillspector.state import AnalyzerNodeResponse, SkillspectorState +from .canonical_sink import resolve_to_canonical_sink from .common import ( apply_import_aliases, build_import_aliases, @@ -37,7 +38,6 @@ get_source_segment, resolve_call_name_typed, resolve_dotted_name, - resolve_dynamic_import_call, ) from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding @@ -176,17 +176,15 @@ def _resolve_sink_name( type_map: dict[str, str] | None = None, aliases: dict[str, str] | None = None, ) -> str | None: - """Resolve a call to its canonical sink name, including dynamic-import chains. + """Resolve a call to its canonical sink name through the canonicalization chokepoint. - Wraps :func:`resolve_call_name_typed` (type-/alias-aware resolution) and falls back - to :func:`resolve_dynamic_import_call` so that - ``importlib.import_module('subprocess').run(...)`` resolves to ``'subprocess.run'`` - and re-enters ``_EXEC_SINKS`` like the statically-imported form would. + Delegates to :func:`resolve_to_canonical_sink`, which reduces every spelling — + type-/alias-aware names, ``importlib.import_module('subprocess').run(...)``, + reflective ``getattr(os, "system")(...)`` and subscript ``__builtins__["exec"](...)`` + / ``vars(builtins)["exec"](...)`` — to the bare/dotted id so it re-enters + ``_EXEC_SINKS`` like the statically-imported form would. """ - name = resolve_call_name_typed(node, type_map, aliases) - if name is None: - name = resolve_dynamic_import_call(node, aliases) - return name + return resolve_to_canonical_sink(node, aliases, type_map) def _classify(name: str, categories: list[tuple[frozenset[str], str]], default: str) -> str: diff --git a/src/skillspector/nodes/analyzers/canonical_sink.py b/src/skillspector/nodes/analyzers/canonical_sink.py new file mode 100644 index 0000000..c79f63e --- /dev/null +++ b/src/skillspector/nodes/analyzers/canonical_sink.py @@ -0,0 +1,289 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Single canonicalization chokepoint for dangerous-callable resolution. + +Every dangerous primitive can be spelled many equivalent ways — bare name, import +alias, ``from`` / ``as`` rebinding, ``builtins.*`` qualification, dynamic import via +``importlib.import_module``, reflective ``getattr(obj, "lit")`` and subscript +reflection ``__builtins__["exec"]``. Detectors that enumerate one idiom at a time +(``import os as o`` here, ``getattr`` there, ``builtins.exec`` elsewhere) leak a new +blind spot with every missed spelling — the recurring regressions behind #114/#115, +#166 and the builtins/importlib gap. + +:func:`resolve_to_canonical_sink` collapses *all* of those spellings to one canonical +sink id (the bare/dotted name the sink sets already match: ``"exec"``, ``"os.system"``, +``"subprocess.run"``). Resolving once, here, lets the sink ladders stay simple and lets +a shared evasion corpus assert the invariant for every new sink. +""" + +from __future__ import annotations + +import ast + +from .common import ( + apply_import_aliases, + resolve_call_name_typed, + resolve_dotted_name, + resolve_dynamic_import_call, +) + + +def _canonical_from_getattr(node: ast.Call, aliases: dict[str, str] | None) -> str | None: + """Resolve ``getattr(obj, "attr")(...)`` to the dotted sink ``".attr"``. + + The dangerous invocation is the *outer* call, so this inspects ``node.func``: when + the callee is itself ``getattr(os, "system")`` the result is ``"os.system"``, and + ``getattr(builtins, "exec")`` collapses to ``"exec"`` (the ``builtins`` prefix is + stripped by :func:`apply_import_aliases`). Only a *constant string* attribute is + resolved; a non-literal attribute is dynamic and left to the caller's + reflective-access rule. Returns ``None`` when ``node.func`` is not a literal + ``getattr`` call. + """ + callee = node.func + if not isinstance(callee, ast.Call): + return None + func_name = resolve_dotted_name(callee.func) + if func_name is not None and aliases: + func_name = apply_import_aliases(func_name, aliases) + if func_name != "getattr" or len(callee.args) < 2: + return None + obj, attr = callee.args[0], callee.args[1] + if not (isinstance(attr, ast.Constant) and isinstance(attr.value, str)): + return None + base = resolve_dotted_name(obj) + if base is None: + return None + if aliases: + base = apply_import_aliases(base, aliases) + if base == "builtins": + return attr.value + return f"{base}.{attr.value}" + + +def _namespace_module(base: ast.expr, aliases: dict[str, str] | None) -> str | None: + """Resolve a subscript base to the module whose namespace dict it exposes. + + Handles the three equivalent namespace handles used to reach a module's callables + by string key: + + - ``__builtins__`` / ``builtins`` → ``"builtins"`` + - ``vars(builtins)`` → ``"builtins"`` (and ``vars(os)`` → ``"os"``) + - ``os.__dict__`` / ``builtins.__dict__`` → ``"os"`` / ``"builtins"`` + + Returns the resolved module name, or ``None`` when the base is not a recognized + namespace handle. + """ + base_name = resolve_dotted_name(base) + if base_name is None and isinstance(base, ast.Call): + # vars() namespace handle. + inner = resolve_dotted_name(base.func) + if inner == "vars" and base.args: + arg = resolve_dotted_name(base.args[0]) + if aliases and arg is not None: + arg = apply_import_aliases(arg, aliases) + return arg + return None + if base_name is None: + return None + if base_name.endswith(".__dict__"): + # ``.__dict__`` exposes the module namespace by string key. Strip the + # trailing ``.__dict__`` *before* alias normalization so the builtins-collapse + # in ``apply_import_aliases`` (which turns ``builtins.__dict__`` into + # ``__dict__``) does not erase the module name. + module = base_name[: -len(".__dict__")] + if aliases: + module = apply_import_aliases(module, aliases) + return "builtins" if module in ("__builtins__", "builtins") else module + # Bare ``__builtins__`` / ``builtins`` are the only other namespace handles; any + # other plain name (a user dict such as ``handlers["exec"]``) is NOT a namespace. + if aliases: + base_name = apply_import_aliases(base_name, aliases) + if base_name in ("__builtins__", "builtins"): + return "builtins" + return None + + +def _canonical_from_subscript(node: ast.Call, aliases: dict[str, str] | None) -> str | None: + """Resolve namespace-dict reflection like ``os.__dict__["system"](...)`` to its sink. + + Recognizes the dict-reflection idiom where ``node.func`` is a subscript whose base is + a module namespace handle (``__builtins__`` / ``vars(builtins)`` / ``os.__dict__``) + and whose index is a string literal: ``__builtins__["exec"]`` → ``"exec"``, + ``os.__dict__["system"]`` → ``"os.system"``. Returns the canonical id, or ``None`` + for any other subscript so a plain user dict (``handlers["exec"]``) is never a sink. + """ + func = node.func + if not isinstance(func, ast.Subscript): + return None + key = func.slice + if not (isinstance(key, ast.Constant) and isinstance(key.value, str)): + return None + module = _namespace_module(func.value, aliases) + if module is None: + return None + if module == "builtins": + return key.value + return f"{module}.{key.value}" + + +# Sibling machinery whose only purpose is dynamic import / code execution. Each maps to +# the canonical sink id of the existing primitive it is equivalent to, so it re-enters +# the established sink ladders rather than introducing a parallel severity scheme: +# +# - ``importlib.__import__`` is literally the ``__import__`` builtin (dynamic import). +# - ``importlib.util.{find_spec,module_from_spec}`` + ``runpy.run_module/run_path`` +# load/execute a module by name → dynamic-import class (``"__import__"``). +# - ``code.interact`` executes arbitrary source → ``exec`` class. +_SIBLING_SINKS: dict[str, str] = { + "importlib.__import__": "__import__", + "importlib.util.find_spec": "__import__", + "importlib.util.module_from_spec": "__import__", + "importlib.util.spec_from_loader": "__import__", + "runpy.run_module": "__import__", + "runpy.run_path": "__import__", + "code.interact": "exec", +} + +# Instance-method tails matched on their canonical code-exec sink, but only when the +# *receiver* statically resolves to the relevant machinery (see :func:`canonical_sibling_method`): +# a loader/interpreter method is typically called on an instance whose class does not +# resolve, so the receiver must be tied to importlib / the ``code`` module to avoid +# flagging an unrelated user method that merely shares the name. +_SIBLING_METHOD_TAILS: dict[str, str] = { + "exec_module": "__import__", + "runsource": "exec", + "runcode": "exec", +} + +# Module roots whose namespace owns the gated instance methods. +_IMPORTLIB_ROOTS: tuple[str, ...] = ("importlib", "importlib.util", "importlib.machinery") +_CODE_INTERPRETER_TYPES: tuple[str, ...] = ( + "code.InteractiveInterpreter", + "code.InteractiveConsole", +) + + +def canonical_sibling_sink(name: str | None) -> str | None: + """Map a resolved sibling-machinery name to the canonical sink it is equivalent to. + + Matches the fully-qualified spelling (``importlib.util.find_spec``); the bare + instance-method tail (``exec_module``, ``runsource``) is resolved separately, with a + receiver check, via :func:`canonical_sibling_method`. Returns the canonical id + (``"__import__"`` / ``"exec"``) or ``None`` when *name* is not a known sibling. + """ + if name is None: + return None + return _SIBLING_SINKS.get(name) + + +def _receiver_is_importlib_loader(receiver: ast.expr, type_map: dict[str, str] | None) -> bool: + """True when *receiver* statically resolves to an importlib spec loader. + + Recognizes the documented ``importlib`` spec-loader chain: ``.loader`` (the + ``ModuleSpec.loader`` attribute, including ``importlib.util.module_from_spec(...)`` + results carried through *type_map*), and bare names whose inferred type is rooted in + ``importlib``. A user object's ``.exec_module`` (receiver not tied to importlib) does + not match. + """ + if isinstance(receiver, ast.Attribute): + # ``.loader`` — the ModuleSpec.loader protocol attribute. + if receiver.attr == "loader": + return True + inferred = None + if isinstance(receiver, ast.Name) and type_map is not None: + inferred = type_map.get(receiver.id) + if inferred is None: + inferred = resolve_dotted_name(receiver) + return bool(inferred and inferred.split(".", 1)[0] == "importlib") + + +def _receiver_is_code_interpreter(receiver: ast.expr, type_map: dict[str, str] | None) -> bool: + """True when *receiver* resolves to ``code.Interactive{Interpreter,Console}``. + + Matches a direct construction (``code.InteractiveInterpreter().runsource(...)``) and a + variable whose inferred constructor type is one of those classes (carried through + *type_map*). A user object defining ``runsource`` / ``runcode`` does not match. + """ + inferred = None + if isinstance(receiver, ast.Call): + inferred = resolve_dotted_name(receiver.func) + elif isinstance(receiver, ast.Name) and type_map is not None: + inferred = type_map.get(receiver.id) + return inferred in _CODE_INTERPRETER_TYPES + + +def canonical_sibling_method(node: ast.Call, type_map: dict[str, str] | None = None) -> str | None: + """Map a gated instance-method call to its canonical code-exec sink id. + + Fires only when *node* is ``.exec_module(...)`` / ``.runsource(...)`` / + ``.runcode(...)`` **and** *recv* statically resolves to the matching machinery — + importlib spec loaders for ``exec_module`` (:func:`_receiver_is_importlib_loader`), + ``code.Interactive{Interpreter,Console}`` for ``runsource`` / ``runcode`` + (:func:`_receiver_is_code_interpreter`). ``exec_module`` → ``"__import__"``, + ``runsource`` / ``runcode`` → ``"exec"``. Returns ``None`` otherwise, so an unrelated + user method sharing the name stays unflagged (documented def-use residual). + """ + func = node.func + if not isinstance(func, ast.Attribute): + return None + canonical = _SIBLING_METHOD_TAILS.get(func.attr) + if canonical is None: + return None + if func.attr == "exec_module": + return canonical if _receiver_is_importlib_loader(func.value, type_map) else None + return canonical if _receiver_is_code_interpreter(func.value, type_map) else None + + +def resolve_to_canonical_sink( + node: ast.Call, + aliases: dict[str, str] | None = None, + type_map: dict[str, str] | None = None, +) -> str | None: + """Reduce any spelling of a call to its canonical sink id. + + Tries, in order of precedence: + + 1. Direct/alias/``from``/``as``/``builtins.*`` resolution via + :func:`resolve_call_name_typed` (also consults *type_map* for instance methods), + then maps dynamic-import / code-exec sibling machinery to the primitive it equals. + 2. Dynamic import chains ``importlib.import_module("os").system`` via + :func:`resolve_dynamic_import_call`. + 3. Reflective ``getattr(obj, "lit")`` via :func:`_canonical_from_getattr`. + 4. Subscript reflection ``__builtins__["exec"]`` / ``os.__dict__["system"]`` via + :func:`_canonical_from_subscript`. + + Returns the canonical id (``"exec"``, ``"os.system"``, …) or ``None`` when the call + cannot be reduced statically (e.g. a non-literal ``getattr`` attribute), which the + caller may still flag through its generic reflective-access rule. + """ + name = resolve_call_name_typed(node, type_map, aliases) + if name is not None: + sibling = canonical_sibling_sink(name) or canonical_sibling_method(node, type_map) + return sibling or name + dynamic = resolve_dynamic_import_call(node, aliases) + if dynamic is not None: + return dynamic + reflective = _canonical_from_getattr(node, aliases) + if reflective is not None: + return reflective + subscript = _canonical_from_subscript(node, aliases) + if subscript is not None: + return subscript + # Instance-method machinery whose receiver does not resolve to a name, e.g. + # ``code.InteractiveInterpreter().runsource(src)`` — match the gated method (the + # receiver must resolve to the interpreter/loader machinery) so the sibling is still + # recognized as a code-exec sink without flagging unrelated same-named user methods. + return canonical_sibling_method(node, type_map) diff --git a/tests/evasion_corpus/__init__.py b/tests/evasion_corpus/__init__.py new file mode 100644 index 0000000..4670798 --- /dev/null +++ b/tests/evasion_corpus/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/evasion_corpus/corpus.py b/tests/evasion_corpus/corpus.py new file mode 100644 index 0000000..aec4144 --- /dev/null +++ b/tests/evasion_corpus/corpus.py @@ -0,0 +1,149 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared evasion corpus: equivalent spellings of dangerous sinks + FP neighbours. + +A single source of truth that *every* sink detector must satisfy. ``EQUIVALENT_SPELLINGS`` +lists, per canonical sink id, ≥8 semantically equivalent ways to invoke it (bare name, +import alias, ``from``/``as`` rebinding, ``builtins.*``, ``importlib.import_module``, +``getattr``, subscript reflection). ``FALSE_POSITIVE_NEIGHBOURS`` lists benign code that +merely *resembles* a sink and must never canonicalize to one. New detectors import these +and assert the invariant, so a future regression (a missed spelling) fails a shared gate +rather than shipping as a silent blind spot. +""" + +from __future__ import annotations + +from typing import NamedTuple + + +class Spelling(NamedTuple): + """One concrete way to write a call, with the canonical sink id it must reduce to.""" + + label: str + code: str + canonical: str + + +class Neighbour(NamedTuple): + """A benign call that resembles a sink and must NOT reduce to any dangerous sink id.""" + + label: str + code: str + + +# Each row's ``code`` ends with the sink invocation as the final top-level statement. +EQUIVALENT_SPELLINGS: tuple[Spelling, ...] = ( + # ── exec (bare builtin) ─────────────────────────────────────────── + Spelling("exec_bare", "exec(x)", "exec"), + Spelling("exec_from_builtins", "from builtins import exec\nexec(x)", "exec"), + Spelling("exec_builtins_attr", "import builtins\nbuiltins.exec(x)", "exec"), + Spelling("exec_builtins_alias", "import builtins as b\nb.exec(x)", "exec"), + Spelling("exec_from_builtins_as", "from builtins import exec as e\ne(x)", "exec"), + Spelling("exec_getattr", 'import builtins\ngetattr(builtins, "exec")(x)', "exec"), + Spelling("exec_subscript_dunder", '__builtins__["exec"](x)', "exec"), + Spelling("exec_subscript_vars", 'import builtins\nvars(builtins)["exec"](x)', "exec"), + Spelling("exec_builtins_dunder_dict", 'import builtins\nbuiltins.__dict__["exec"](x)', "exec"), + # ── os.system (dotted module sink) ──────────────────────────────── + Spelling("os_system_direct", "import os\nos.system(x)", "os.system"), + Spelling("os_system_alias", "import os as o\no.system(x)", "os.system"), + Spelling("os_system_from", "from os import system\nsystem(x)", "os.system"), + Spelling("os_system_getattr", 'import os\ngetattr(os, "system")(x)', "os.system"), + Spelling("os_system_dunder_dict", 'import os\nos.__dict__["system"](x)', "os.system"), + Spelling( + "os_system_importlib", + 'import importlib\nimportlib.import_module("os").system(x)', + "os.system", + ), + Spelling( + "os_system_import_module_bare", + 'from importlib import import_module\nimport_module("os").system(x)', + "os.system", + ), + Spelling( + "os_system_importlib_alias", + 'import importlib as il\nil.import_module("os").system(x)', + "os.system", + ), + # ── subprocess.run (dotted module sink) ─────────────────────────── + Spelling("subprocess_run_direct", "import subprocess\nsubprocess.run(x)", "subprocess.run"), + Spelling("subprocess_run_alias", "import subprocess as sp\nsp.run(x)", "subprocess.run"), + Spelling("subprocess_run_from", "from subprocess import run\nrun(x)", "subprocess.run"), + Spelling( + "subprocess_run_getattr", + 'import subprocess\ngetattr(subprocess, "run")(x)', + "subprocess.run", + ), + Spelling( + "subprocess_run_importlib", + 'import importlib\nimportlib.import_module("subprocess").run(x)', + "subprocess.run", + ), + # ── dynamic-import siblings (→ __import__ class) ────────────────── + Spelling( + "importlib_dunder_import", + 'import importlib\nimportlib.__import__("os")', + "__import__", + ), + Spelling( + "importlib_find_spec", + 'import importlib.util\nimportlib.util.find_spec("os")', + "__import__", + ), + Spelling( + "importlib_module_from_spec", + "import importlib.util\nimportlib.util.module_from_spec(spec)", + "__import__", + ), + Spelling("runpy_run_module", 'import runpy\nrunpy.run_module("os")', "__import__"), + Spelling( + "spec_loader_exec_module", + "import importlib.util\n" + "spec = importlib.util.module_from_spec(s)\n" + "spec.loader.exec_module(mod)", + "__import__", + ), + # ── code-exec siblings (→ exec class) ───────────────────────────── + Spelling("code_interact", "import code\ncode.interact()", "exec"), + Spelling( + "code_runsource", + "import code\ncode.InteractiveInterpreter().runsource(x)", + "exec", + ), +) + +# Benign code that resembles a sink; must canonicalize to something OUTSIDE the sink sets. +FALSE_POSITIVE_NEIGHBOURS: tuple[Neighbour, ...] = ( + Neighbour("user_exec_helper", "from mymod import exec_helper\nexec_helper()"), + Neighbour("getattr_benign_attr", 'val = getattr(cfg, "timeout")'), + Neighbour( + "importlib_benign_loads", + 'import importlib\nimportlib.import_module("json").loads(x)', + ), + Neighbour("subscript_user_dict", 'handlers = {}\nhandlers["exec"](x)'), + Neighbour("getattr_non_dangerous_module", 'import os\ngetattr(os, "getcwd")()'), + Neighbour("dunder_dict_benign_attr", 'import os\nos.__dict__["getcwd"]()'), + Neighbour( + "importlib_spec_from_file_benign", + 'import importlib.util\nimportlib.util.spec_from_file_location("m", "/x.py")', + ), + Neighbour("user_object_dunder_dict", 'obj = Thing()\nobj.__dict__["handler"](x)'), + Neighbour( + "user_class_exec_module", + "class L:\n def exec_module(self, m):\n pass\n\n\nL().exec_module(1)", + ), + Neighbour("user_runsource_method", "ci = MyRepl()\nci.runsource(x)"), + Neighbour("user_loader_unrelated", "loader = MyLoader()\nloader.exec_module(m)"), +) diff --git a/tests/evasion_corpus/test_canonical_sink_fitness.py b/tests/evasion_corpus/test_canonical_sink_fitness.py new file mode 100644 index 0000000..32b5f2d --- /dev/null +++ b/tests/evasion_corpus/test_canonical_sink_fitness.py @@ -0,0 +1,209 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Fitness gate: the canonical layer maps every evasion spelling to one sink id. + +This is the structural counter-measure to the "enumerate instead of canonicalize" +anti-pattern. Rather than one test per idiom, the shared corpus asserts that *every* +equivalent spelling reduces to the same canonical id through the single chokepoint, and +that benign neighbours never do. Adding a new sink means adding corpus rows, not a new +detector branch. +""" + +from __future__ import annotations + +import ast + +import pytest + +from skillspector.nodes.analyzers import behavioral_ast, behavioral_taint_tracking +from skillspector.nodes.analyzers.behavioral_taint_tracking import _EXEC_SINKS +from skillspector.nodes.analyzers.canonical_sink import resolve_to_canonical_sink +from skillspector.nodes.analyzers.common import build_import_aliases, build_type_map + +from .corpus import EQUIVALENT_SPELLINGS, FALSE_POSITIVE_NEIGHBOURS, Neighbour, Spelling + + +def _outermost_call(code: str) -> ast.Call: + """Return the outermost sink-invocation call from a corpus snippet. + + Corpus rows place the sink call as the last top-level ``Expr``/``Assign`` value; + this returns that call node so the chokepoint is queried on the actual invocation + (e.g. ``getattr(os, "system")(x)`` resolves on the outer ``(x)`` call). + """ + tree = ast.parse(code) + last: ast.Call | None = None + for stmt in tree.body: + value = stmt.value if isinstance(stmt, (ast.Expr, ast.Assign)) else None + if isinstance(value, ast.Call): + last = value + if last is None: # pragma: no cover - corpus rows always end in a call + raise AssertionError(f"corpus snippet has no top-level call: {code!r}") + return last + + +def _canonical_for(code: str) -> str | None: + """Resolve a corpus snippet's outermost call to its canonical sink id.""" + tree = ast.parse(code) + aliases = build_import_aliases(tree) + type_map = build_type_map(tree) + return resolve_to_canonical_sink(_outermost_call(code), aliases, type_map) + + +@pytest.mark.parametrize("row", EQUIVALENT_SPELLINGS, ids=lambda r: r.label) +def test_equivalent_spelling_canonicalizes(row: Spelling) -> None: + """Every equivalent spelling reduces to its single canonical sink id.""" + assert _canonical_for(row.code) == row.canonical + + +@pytest.mark.parametrize("neighbour", FALSE_POSITIVE_NEIGHBOURS, ids=lambda n: n.label) +def test_false_positive_neighbour_not_a_sink(neighbour: Neighbour) -> None: + """Benign look-alikes never canonicalize to a dangerous exec sink id.""" + assert _canonical_for(neighbour.code) not in _EXEC_SINKS + + +def test_corpus_has_min_spellings_per_primitive() -> None: + """Each canonical primitive carries at least eight equivalent spellings. + + Guards the corpus against silently shrinking below the breadth that makes it a + meaningful evasion gate. + """ + counts: dict[str, int] = {} + for row in EQUIVALENT_SPELLINGS: + counts[row.canonical] = counts.get(row.canonical, 0) + 1 + assert counts.get("exec", 0) >= 8 + assert all(count >= 5 for count in counts.values()) + # The sibling-machinery surface (dynamic import / code exec) is represented. + assert "__import__" in counts + + +# ── End-to-end wiring gate ──────────────────────────────────────────── +# +# The fitness checks above prove the resolver *can* canonicalize. These prove the +# resolver is actually *wired into production*: every spelling must reach a real sink +# rule (AST + taint), so an accidental un-wiring or severity downgrade fails loudly +# rather than silently regressing to a missed detection. + +# behavioral_ast rules that constitute a real dangerous-execution sink detection. A +# spelling is "detected" if it produces any of these — the precise rule depends on which +# complementary path fires (the canonical ladders AST1/AST4/AST5/AST6, or AST9 for a +# literal getattr on an allowlisted name, owned by the getattr branch on `main`). +_DANGEROUS_EXECUTION_RULES: frozenset[str] = frozenset( + {"AST1", "AST2", "AST3", "AST4", "AST5", "AST6", "AST9"} +) + +# Minimum severity each canonical primitive must keep — guards against a downgrade +# (e.g. an exec sink silently dropping to AST3-MEDIUM). ``__import__`` is the +# dynamic-import class (AST3-MEDIUM), matching the repo's grading of ``__import__``. +_MIN_SEVERITY: dict[str, str] = { + "exec": "HIGH", + "os.system": "HIGH", + "subprocess.run": "MEDIUM", + "__import__": "MEDIUM", +} + +_SEVERITY_ORDER: dict[str, int] = {"LOW": 0, "MEDIUM": 1, "HIGH": 2, "CRITICAL": 3} + +# Canonical ids the taint analyzer treats as exec sinks (TT5). ``__import__`` is +# deliberately excluded: the repo does not grade bare ``__import__`` as a taint exec +# sink, so its dynamic-import siblings stay consistent with that baseline. +_TAINT_EXEC_CANONICALS: frozenset[str] = frozenset( + {"exec", "eval", "compile", "os.system", "subprocess.run"} +) + + +def _run_ast(code: str) -> list: + """Run the real behavioral_ast analyzer on a one-file skill and return findings.""" + state = {"components": ["s.py"], "file_cache": {"s.py": code}} + return behavioral_ast.node(state)["findings"] + + +def _run_taint(code: str) -> list: + """Run the real behavioral_taint_tracking analyzer; *code* must end in a tainted flow.""" + state = {"components": ["s.py"], "file_cache": {"s.py": code}} + return behavioral_taint_tracking.node(state)["findings"] + + +@pytest.mark.parametrize("row", EQUIVALENT_SPELLINGS, ids=lambda r: r.label) +def test_spelling_reaches_ast_sink_rule(row: Spelling) -> None: + """Every spelling reaches a dangerous-execution rule in production at full severity. + + Proves the chokepoint is actually invoked by production (not merely importable) and + that the spelling is not silently dropped or downgraded. The exact rule may be a + canonical-ladder rule or AST9 (literal getattr, owned by the getattr branch on + ``main``) — both are valid sink detections — so the assertion targets the + dangerous-execution rule set plus a minimum-severity floor. + """ + findings = _run_ast(row.code) + sink_findings = [f for f in findings if f.rule_id in _DANGEROUS_EXECUTION_RULES] + assert sink_findings, f"{row.label}: no sink rule, got {[f.rule_id for f in findings]}" + best = max(_SEVERITY_ORDER[f.severity] for f in sink_findings) + assert best >= _SEVERITY_ORDER[_MIN_SEVERITY[row.canonical]], ( + f"{row.label}: severity downgrade, got {[(f.rule_id, f.severity) for f in sink_findings]}" + ) + + +@pytest.mark.parametrize( + "row", + [r for r in EQUIVALENT_SPELLINGS if r.canonical in _TAINT_EXEC_CANONICALS and "(x)" in r.code], + ids=lambda r: r.label, +) +def test_spelling_reaches_taint_exec_sink(row: Spelling) -> None: + """Routed through a tainted input, every exec-class spelling reaches the TT5 flow. + + Restricted to exec-class canonicals whose sink receives the tainted value ``x``; + ``__import__`` siblings are excluded (the taint analyzer does not grade dynamic + import as an exec sink — baseline parity), as are arg-less sinks like + ``code.interact()`` that cannot carry a tainted argument. + """ + tainted = f"x = input()\n{row.code}" + findings = _run_taint(tainted) + assert any(f.rule_id == "TT5" for f in findings), ( + f"{row.label}: expected TT5, got {[f.rule_id for f in findings]}" + ) + + +# Reflective subprocess invocations must grade HIGH (AST9), consistent with reflective +# os.system — the reflection itself signals evasion intent, unlike a direct AST4-MEDIUM +# subprocess.* call. +_SUBPROCESS_REFLECTION: tuple[str, ...] = ( + 'import subprocess\ngetattr(subprocess, "Popen")(x)', + 'import subprocess\ngetattr(subprocess, "run")(x)', + 'import subprocess\ngetattr(subprocess, "check_output")(x)', +) + + +@pytest.mark.parametrize("code", _SUBPROCESS_REFLECTION) +def test_reflective_subprocess_grades_high(code: str) -> None: + """Reflective subprocess sinks fire AST9-HIGH (not AST4-MEDIUM) for severity parity.""" + findings = _run_ast(code) + ast9 = [f for f in findings if f.rule_id == "AST9"] + assert ast9, f"expected AST9, got {[(f.rule_id, f.severity) for f in findings]}" + assert ast9[0].severity == "HIGH" + assert not any(f.rule_id == "AST4" for f in findings), "should not double-fire AST4" + + +def test_direct_subprocess_stays_medium() -> None: + """A *direct* subprocess.* call keeps its baseline AST4-MEDIUM (no over-escalation).""" + findings = _run_ast("import subprocess\nsubprocess.run(['id'])") + ast4 = [f for f in findings if f.rule_id == "AST4"] + assert ast4 and ast4[0].severity == "MEDIUM" + assert not any(f.rule_id == "AST9" for f in findings) + + +@pytest.mark.parametrize("neighbour", FALSE_POSITIVE_NEIGHBOURS, ids=lambda n: n.label) +def test_neighbour_produces_no_ast_findings(neighbour: Neighbour) -> None: + """Benign look-alikes produce zero behavioral_ast findings in production.""" + assert _run_ast(neighbour.code) == []