Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/skillspector/nodes/analyzers/behavioral_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
get_context_from_lines,
get_source_segment,
resolve_call_name,
resolve_dynamic_import_call,
)
from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding

Expand Down Expand Up @@ -182,6 +183,10 @@ def _emit(
continue

call_name = resolve_call_name(ast_node, aliases)
if call_name is None:
# Dynamic-import chain: importlib.import_module('os').system(...) →
# 'os.system', so it re-enters the os./subprocess. sink ladders below.
call_name = resolve_dynamic_import_call(ast_node, aliases)
if call_name is None:
continue

Expand Down
21 changes: 20 additions & 1 deletion src/skillspector/nodes/analyzers/behavioral_taint_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
get_source_segment,
resolve_call_name_typed,
resolve_dotted_name,
resolve_dynamic_import_call,
)
from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding

Expand Down Expand Up @@ -170,6 +171,24 @@
]


def _resolve_sink_name(
node: ast.Call,
type_map: dict[str, str] | None = None,
aliases: dict[str, str] | None = None,
) -> str | None:
"""Resolve a call to its canonical sink name, including dynamic-import chains.

Wraps :func:`resolve_call_name_typed` (type-/alias-aware resolution) and falls back
to :func:`resolve_dynamic_import_call` so that
``importlib.import_module('subprocess').run(...)`` resolves to ``'subprocess.run'``
and re-enters ``_EXEC_SINKS`` like the statically-imported form would.
"""
name = resolve_call_name_typed(node, type_map, aliases)
if name is None:
name = resolve_dynamic_import_call(node, aliases)
return name


def _classify(name: str, categories: list[tuple[frozenset[str], str]], default: str) -> str:
for names, label in categories:
if name in names:
Expand Down Expand Up @@ -363,7 +382,7 @@ def _emit(
if not isinstance(ast_node, ast.Call):
continue

sink_name = resolve_call_name_typed(ast_node, type_map, aliases)
sink_name = _resolve_sink_name(ast_node, type_map, aliases)
if not sink_name or sink_name not in _ALL_SINKS:
continue

Expand Down
75 changes: 71 additions & 4 deletions src/skillspector/nodes/analyzers/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,47 @@ def resolve_dotted_name(node: ast.expr) -> str | None:
return None


def _strip_builtins_prefix(name: str) -> str:
"""Collapse a ``builtins``-qualified name back to its bare builtin name.

``builtins.exec`` → ``exec`` (and ``builtins.eval``/``compile``/``__import__``…).
The analyzers match dangerous builtins by their bare name (``call_name == "exec"``,
``name in _EXEC_SINKS``), but ``from builtins import exec`` / ``import builtins;
builtins.exec(...)`` resolve, through the import-alias map, to the *qualified*
spelling ``builtins.exec`` — which would otherwise slip past those checks. Since
``builtins.exec is exec`` at runtime, collapsing the prefix is semantically exact
and re-enters the existing bare-name detection.

Only the single-segment form ``builtins.<attr>`` is collapsed; deeper chains
(``builtins.foo.bar``) are left untouched as they are not direct builtin calls.
"""
root, sep, rest = name.partition(".")
if root == "builtins" and sep and "." not in rest:
return rest
return name


def apply_import_aliases(name: str, aliases: dict[str, str]) -> str:
"""Rewrite a resolved call name to its fully-qualified form using import aliases.

Bridges two evasion-prone spellings back to the canonical dotted name that the
Bridges several evasion-prone spellings back to the canonical name that the
analyzers match against:

- ``from os import system`` → ``{"system": "os.system"}`` so a bare ``system``
call resolves to ``"os.system"``.
- ``import os as o`` → ``{"o": "os"}`` so ``o.system`` resolves to ``"os.system"``.
- ``from builtins import exec`` / ``import builtins; builtins.exec(...)`` → the
bare builtin ``exec`` (via :func:`_strip_builtins_prefix`), so dangerous
builtins matched by bare name are not hidden behind a ``builtins.`` qualifier.

Idempotent for already-canonical names (``os.system`` stays ``os.system``).
"""
if name in aliases:
return aliases[name]
return _strip_builtins_prefix(aliases[name])
root, sep, rest = name.partition(".")
if sep and root in aliases:
return f"{aliases[root]}.{rest}"
return name
return _strip_builtins_prefix(f"{aliases[root]}.{rest}")
return _strip_builtins_prefix(name)


def resolve_call_name(node: ast.Call, aliases: dict[str, str] | None = None) -> str | None:
Expand All @@ -138,6 +161,50 @@ def resolve_call_name(node: ast.Call, aliases: dict[str, str] | None = None) ->
return name


def _dynamic_import_target(node: ast.expr, aliases: dict[str, str] | None = None) -> str | None:
"""Return the imported module name for an ``importlib.import_module('mod')`` call.

Recognizes both ``importlib.import_module('os')`` and the bare-imported
``from importlib import import_module; import_module('os')`` (resolved via the
import-alias map), returning the string literal module name (``'os'``) when the
first positional argument is a constant. Returns ``None`` for anything else
(non-literal argument, unrelated call), so callers stay precise and avoid false
positives on dynamic module names the analyzer cannot resolve statically.
"""
if not isinstance(node, ast.Call):
return None
func_name = resolve_dotted_name(node.func)
if func_name is not None and aliases:
func_name = apply_import_aliases(func_name, aliases)
if func_name not in ("importlib.import_module", "import_module"):
return None
if node.args and isinstance(node.args[0], ast.Constant) and isinstance(node.args[0].value, str):
return node.args[0].value
return None


def resolve_dynamic_import_call(
node: ast.Call, aliases: dict[str, str] | None = None
) -> str | None:
"""Resolve ``importlib.import_module('mod').attr(...)`` to the dotted sink ``'mod.attr'``.

Bridges the dynamic-import evasion that mirrors ``__import__``: a skill writes
``importlib.import_module('os').system(cmd)`` (or imports ``import_module`` bare)
so the dangerous module never appears as a static ``import``. When *node*'s callee
is an attribute access on such a chain, this returns the canonical sink name
(``'os.system'``, ``'subprocess.run'``) that the existing sink ladders already
match. Returns ``None`` when the chain is not a literal dynamic import, keeping the
resolution precise (no false positives on un-resolvable dynamic names).
"""
func = node.func
if not isinstance(func, ast.Attribute):
return None
module_name = _dynamic_import_target(func.value, aliases)
if module_name is None:
return None
return f"{module_name}.{func.attr}"


def _build_import_aliases(tree: ast.Module) -> dict[str, str]:
"""Map locally imported names to their fully-qualified module paths.

Expand Down
89 changes: 89 additions & 0 deletions tests/nodes/analyzers/test_behavioral_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,92 @@ def test_multiple_dangerous_calls_in_one_file(self):
assert "AST2" in rule_ids
assert "AST4" in rule_ids
assert "AST5" in rule_ids


# ── builtins / importlib import-chain evasion ─────────────────────────


class TestBuiltinsImportEvasion:
"""Dangerous builtins hidden behind the ``builtins`` module must still alert.

The analyzer matches dangerous builtins by their bare name (``exec``/``eval``/
``compile``/``__import__``). Writing ``from builtins import exec`` or
``import builtins; builtins.exec(...)`` resolves, through the import-alias map,
to the qualified spelling ``builtins.exec`` — which would slip past the bare-name
checks unless it is canonicalized back. Since ``builtins.exec is exec``, the
collapse is semantically exact. Complements the ``getattr`` branch (PR #166).
"""

def test_from_builtins_import_exec(self):
"""``from builtins import exec; exec(code)`` must still raise AST1."""
findings = _run("from builtins import exec\nexec('x = 1')\n")
assert any(f.rule_id == "AST1" for f in findings)

def test_from_builtins_import_eval(self):
"""``from builtins import eval`` must still raise AST2."""
findings = _run("from builtins import eval\neval('2 + 2')\n")
assert any(f.rule_id == "AST2" for f in findings)

def test_from_builtins_import_compile(self):
"""``from builtins import compile`` must still raise AST6."""
findings = _run("from builtins import compile\ncompile('x', '<s>', 'exec')\n")
assert any(f.rule_id == "AST6" for f in findings)

def test_from_builtins_import_dunder_import(self):
"""``from builtins import __import__`` must still raise AST3."""
findings = _run("from builtins import __import__\n__import__('os')\n")
assert any(f.rule_id == "AST3" for f in findings)

def test_import_builtins_dot_exec(self):
"""``import builtins; builtins.exec(...)`` must still raise AST1."""
findings = _run("import builtins\nbuiltins.exec('x = 1')\n")
assert any(f.rule_id == "AST1" for f in findings)

def test_import_builtins_as_alias_dot_exec(self):
"""``import builtins as b2; b2.exec(...)`` must still raise AST1."""
findings = _run("import builtins as b2\nb2.exec('x = 1')\n")
assert any(f.rule_id == "AST1" for f in findings)

def test_from_builtins_import_exec_as_alias(self):
"""``from builtins import exec as e; e(...)`` must still raise AST1."""
findings = _run("from builtins import exec as e\ne('x = 1')\n")
assert any(f.rule_id == "AST1" for f in findings)

def test_user_module_exec_helper_no_false_positive(self):
"""A benign helper merely *named* like a sink must not match (FP-neighbor).

``from mymod import exec_helper; exec_helper()`` imports an unrelated
third-party callable — it is not ``builtins.exec`` and must stay clean.
"""
findings = _run("from mymod import exec_helper\nexec_helper()\n")
assert findings == []


class TestImportlibDynamicChainEvasion:
"""``importlib.import_module('mod').attr(...)`` is a dynamic-import sink chain.

It mirrors ``__import__('mod')`` but lets the dangerous module name live in a
string literal so it never appears as a static ``import``. The chain is resolved
to the canonical dotted sink (``os.system``/``subprocess.run``) so it re-enters
the existing ``os.``/``subprocess.`` sink ladders.
"""

def test_importlib_import_module_os_system(self):
"""``importlib.import_module('os').system(...)`` must raise AST5."""
findings = _run("import importlib\nimportlib.import_module('os').system('id')\n")
assert any(f.rule_id == "AST5" for f in findings)

def test_importlib_import_module_subprocess_run(self):
"""``importlib.import_module('subprocess').run(...)`` must raise AST4."""
findings = _run("import importlib\nimportlib.import_module('subprocess').run(['id'])\n")
assert any(f.rule_id == "AST4" for f in findings)

def test_from_importlib_import_module_os_system(self):
"""Bare-imported ``import_module('os').system(...)`` must raise AST5."""
findings = _run("from importlib import import_module\nimport_module('os').system('id')\n")
assert any(f.rule_id == "AST5" for f in findings)

def test_importlib_import_module_benign_no_false_positive(self):
"""A benign dynamic import (``json.loads``) must not match a sink ladder."""
findings = _run("import importlib\nimportlib.import_module('json').loads('{}')\n")
assert findings == []
59 changes: 59 additions & 0 deletions tests/nodes/analyzers/test_behavioral_taint_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,62 @@ def test_untyped_variable_no_false_positive(self):
)
findings = _run(code)
assert not any(f.rule_id == "TT4" for f in findings)


# ── builtins / importlib exec-sink evasion ────────────────────────────


class TestBuiltinsImportlibSinkEvasion:
"""Exec sinks reached via ``builtins.*`` or ``importlib.import_module`` must alert.

``_EXEC_SINKS`` matches by bare/qualified name (``"exec"``, ``"os.system"``).
``from builtins import exec`` resolves to ``builtins.exec`` (collapsed back to
``exec``) and ``importlib.import_module('subprocess').run`` resolves to the
canonical ``subprocess.run`` — both must re-enter the exec-sink path so a
user-input → exec flow is flagged as TT5. Complements the ``getattr`` branch
(PR #166): this covers the import/builtins/importlib branch.
"""

def test_from_builtins_import_exec_sink(self):
"""``from builtins import exec`` with tainted input must raise TT5."""
code = "from builtins import exec\ncode = input()\nexec(code)\n"
findings = _run(code)
assert any(f.rule_id == "TT5" for f in findings)

def test_import_builtins_dot_exec_sink(self):
"""``import builtins; builtins.exec(input())`` must raise TT5."""
code = "import builtins\ncode = input()\nbuiltins.exec(code)\n"
findings = _run(code)
assert any(f.rule_id == "TT5" for f in findings)

def test_import_builtins_as_alias_sink(self):
"""``import builtins as b2; b2.exec(input())`` must raise TT5."""
code = "import builtins as b2\ncode = input()\nb2.exec(code)\n"
findings = _run(code)
assert any(f.rule_id == "TT5" for f in findings)

def test_importlib_import_module_os_system_sink(self):
"""``importlib.import_module('os').system(input())`` must raise TT5."""
code = "import importlib\ncmd = input()\nimportlib.import_module('os').system(cmd)\n"
findings = _run(code)
assert any(f.rule_id == "TT5" for f in findings)

def test_importlib_import_module_subprocess_run_sink(self):
"""``importlib.import_module('subprocess').run(input())`` must raise TT5."""
code = "import importlib\ncmd = input()\nimportlib.import_module('subprocess').run(cmd)\n"
findings = _run(code)
assert any(f.rule_id == "TT5" for f in findings)

def test_from_importlib_import_module_sink(self):
"""Bare-imported ``import_module('os').system(input())`` must raise TT5."""
code = (
"from importlib import import_module\ncmd = input()\nimport_module('os').system(cmd)\n"
)
findings = _run(code)
assert any(f.rule_id == "TT5" for f in findings)

def test_importlib_benign_module_no_false_positive(self):
"""A benign dynamic import (``json.loads``) must not be treated as an exec sink."""
code = "import importlib\ndata = input()\nimportlib.import_module('json').loads(data)\n"
findings = _run(code)
assert not any(f.rule_id == "TT5" for f in findings)