Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions src/skillspector/nodes/analyzers/behavioral_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@
from skillspector.models import AnalyzerFinding, Finding, Location, Severity
from skillspector.state import AnalyzerNodeResponse, SkillspectorState

from .canonical_sink import (
canonical_sibling_method,
canonical_sibling_sink,
resolve_to_canonical_sink,
)
from .common import (
build_import_aliases,
build_type_map,
get_context_from_lines,
get_source_segment,
resolve_call_name,
resolve_dynamic_import_call,
)
from .static_runner import MAX_FILE_BYTES, analyzer_finding_to_finding

Expand Down Expand Up @@ -122,6 +129,29 @@
_TAG = "Dangerous Code Execution"


def _covered_by_ast9(node: ast.Call) -> bool:
"""True when *node* invokes a literal ``getattr`` already flagged by AST9.

AST9 catches ``getattr(obj, "<name>")(...)`` for ``<name>`` in
:data:`_DANGEROUS_GETATTR_NAMES`. The canonical-sink fallback skips exactly those so
the two rules stay complementary rather than double-firing: the canonical layer then
owns only the spellings AST9 does not (subscript ``__builtins__["exec"]`` /
``vars(builtins)["exec"]`` and getattr targets outside the allowlist such as
``getattr(subprocess, "Popen")``).
"""
callee = node.func
if not (isinstance(callee, ast.Call) and isinstance(callee.func, ast.Name)):
return False
if callee.func.id != "getattr" or len(callee.args) < 2:
return False
attr = callee.args[1]
return (
isinstance(attr, ast.Constant)
and isinstance(attr.value, str)
and attr.value in _DANGEROUS_GETATTR_NAMES
)


def _is_chain_sink(node: ast.Call, aliases: dict[str, str] | None = None) -> bool:
"""True if this call is exec(), eval(), or compile() — the outer dangerous call."""
name = resolve_call_name(node, aliases)
Expand Down Expand Up @@ -155,6 +185,7 @@ def _analyze_python(content: str, file_path: str) -> list[AnalyzerFinding]:
return []

aliases = build_import_aliases(tree)
type_map = build_type_map(tree)
lines = content.splitlines()
findings: list[AnalyzerFinding] = []

Expand Down Expand Up @@ -182,6 +213,32 @@ def _emit(
continue

call_name = resolve_call_name(ast_node, aliases)
if call_name is not None:
# Dynamic-import / code-exec sibling machinery (importlib.__import__,
# importlib.util.find_spec, runpy.run_module, code.interact, and the
# instance-method tails spec.loader.exec_module / runsource) resolves by name
# but matches no ladder; remap it to the primitive it equals so it re-enters
# the __import__/exec arms below.
call_name = (
canonical_sibling_sink(call_name)
or canonical_sibling_method(ast_node, type_map)
or call_name
)
if call_name is None:
# Dynamic-import chain: importlib.import_module('os').system(...) →
# 'os.system', so it re-enters the os./subprocess. sink ladders below.
call_name = resolve_dynamic_import_call(ast_node, aliases)
reflective = False
if call_name is None and not _covered_by_ast9(ast_node):
# Reflective invocation whose callee does not resolve by name:
# getattr(subprocess, "Popen")(cmd) (outside AST9's allowlist),
# __builtins__["exec"](src), vars(builtins)["exec"](src). Canonicalize to
# the bare/dotted sink id so it re-enters the ladders below with the correct
# rule + severity. Literal getattr() on an AST9-allowlisted name is left to
# AST9 (see _covered_by_ast9) so the two rules stay complementary, not
# duplicate.
call_name = resolve_to_canonical_sink(ast_node, aliases)
reflective = call_name is not None
if call_name is None:
continue

Expand Down Expand Up @@ -211,7 +268,11 @@ def _emit(
elif call_name.startswith("subprocess."):
attr = call_name.split(".", 1)[1]
if attr in _SUBPROCESS_CALLS:
_emit("AST4", lineno, end_lineno)
# A *reflective* subprocess invocation (getattr/subscript) signals the
# same evasion intent as reflective os.system, so it grades AST9-HIGH to
# stay consistent with that case; a direct subprocess.* call keeps its
# baseline AST4-MEDIUM.
_emit("AST9" if reflective else "AST4", lineno, end_lineno)

elif call_name.startswith("os."):
attr = call_name.split(".", 1)[1]
Expand All @@ -222,10 +283,7 @@ def _emit(
second_arg = ast_node.args[1]
if not isinstance(second_arg, ast.Constant):
_emit("AST7", lineno, end_lineno)
elif (
isinstance(second_arg.value, str)
and second_arg.value in _DANGEROUS_GETATTR_NAMES
):
elif isinstance(second_arg.value, str) and second_arg.value in _DANGEROUS_GETATTR_NAMES:
_emit("AST9", lineno, end_lineno)

return findings
Expand Down
19 changes: 18 additions & 1 deletion src/skillspector/nodes/analyzers/behavioral_taint_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from skillspector.models import AnalyzerFinding, Finding, Location, Severity
from skillspector.state import AnalyzerNodeResponse, SkillspectorState

from .canonical_sink import resolve_to_canonical_sink
from .common import (
apply_import_aliases,
build_import_aliases,
Expand Down Expand Up @@ -170,6 +171,22 @@
]


def _resolve_sink_name(
node: ast.Call,
type_map: dict[str, str] | None = None,
aliases: dict[str, str] | None = None,
) -> str | None:
"""Resolve a call to its canonical sink name through the canonicalization chokepoint.

Delegates to :func:`resolve_to_canonical_sink`, which reduces every spelling —
type-/alias-aware names, ``importlib.import_module('subprocess').run(...)``,
reflective ``getattr(os, "system")(...)`` and subscript ``__builtins__["exec"](...)``
/ ``vars(builtins)["exec"](...)`` — to the bare/dotted id so it re-enters
``_EXEC_SINKS`` like the statically-imported form would.
"""
return resolve_to_canonical_sink(node, aliases, type_map)


def _classify(name: str, categories: list[tuple[frozenset[str], str]], default: str) -> str:
for names, label in categories:
if name in names:
Expand Down Expand Up @@ -363,7 +380,7 @@ def _emit(
if not isinstance(ast_node, ast.Call):
continue

sink_name = resolve_call_name_typed(ast_node, type_map, aliases)
sink_name = _resolve_sink_name(ast_node, type_map, aliases)
if not sink_name or sink_name not in _ALL_SINKS:
continue

Expand Down
Loading