diff --git a/factory/eval/hygiene.py b/factory/eval/hygiene.py index 46c3fbe..37e4844 100644 --- a/factory/eval/hygiene.py +++ b/factory/eval/hygiene.py @@ -1,11 +1,11 @@ """Universal hygiene eval dimensions applied to every factory-managed project. -These 6 dimensions are mandatory and cannot be removed. They are computed by +These 7 dimensions are mandatory and cannot be removed. They are computed by the factory itself (not by per-project eval/score.py) and auto-detect the project's tooling. Projects can ADD dimensions via eval/score.py but cannot remove any of these. -Together with the 5 growth dimensions in growth.py, these form the 11 +Together with the 5 growth dimensions in growth.py, these form the 12 mandatory eval dimensions that define the factory's quality baseline. All functions take a project_path and return an EvalResult-compatible dict. @@ -17,15 +17,20 @@ import subprocess from pathlib import Path +import structlog + +log = structlog.get_logger() + # Relative weights within the hygiene category (sum to 1.0). # The runner normalizes these so that hygiene gets 50% of the composite. HYGIENE_WEIGHTS = { - "tests": 0.30, - "lint": 0.15, - "type_check": 0.10, - "coverage": 0.25, - "guard_patterns": 0.10, - "config_parser": 0.10, + "tests": 0.28, + "lint": 0.14, + "type_check": 0.09, + "coverage": 0.23, + "guard_patterns": 0.09, + "config_parser": 0.09, + "security": 0.08, } @@ -523,11 +528,57 @@ def eval_config_parser(project_path: Path) -> dict: } +# ── Dimension 7: security (weight 0.08) ───────────────────────── + + +def eval_security(project_path: Path) -> dict: + """Run security scanners via the pluggable scanner registry. + + Delegates to factory.security.ScannerRegistry for scanner detection and + execution. Each registered scanner (bandit, npm-audit, semgrep, trivy, + git-secrets) auto-detects applicability and runs if appropriate. + + Scoring: partial credit, deducting 0.1 per issue found across all scanners. + Returns neutral (0.5) when no scanner is applicable. + """ + from factory.security import get_default_registry + + registry = get_default_registry() + sub_projects = _find_sub_projects(project_path) + total_issues = 0 + ran_any = False + details_parts: list[str] = [] + + for sp in sub_projects: + results = registry.scan(sp) + for result in results: + ran_any = True + count = result.issue_count + total_issues += count + label = f"{sp.name}({result.scanner_name})" + if count > 0: + details_parts.append(f"{label}: {count} issues") + else: + details_parts.append(f"{label}: clean") + + if not ran_any: + return _neutral("security", "no security scanner detected") + + score = max(0.0, 1.0 - total_issues * 0.1) + return { + "name": "security", + "score": round(score, 4), + "weight": HYGIENE_WEIGHTS["security"], + "passed": total_issues == 0, + "details": "; ".join(details_parts), + } + + # ── Public API ───────────────────────────────────────────────────── def compute_hygiene_results(project_path: Path) -> list[dict]: - """Compute all 6 mandatory hygiene dimensions for a project.""" + """Compute all 7 mandatory hygiene dimensions for a project.""" return [ eval_tests(project_path), eval_lint(project_path), @@ -535,4 +586,5 @@ def compute_hygiene_results(project_path: Path) -> list[dict]: eval_coverage(project_path), eval_guard_patterns(project_path), eval_config_parser(project_path), + eval_security(project_path), ] diff --git a/factory/eval/runner.py b/factory/eval/runner.py index a4e1da2..25227bb 100644 --- a/factory/eval/runner.py +++ b/factory/eval/runner.py @@ -1,7 +1,8 @@ """EvalRunner — compute mandatory dimensions and merge with project-specific evals. The factory's eval system has mandatory dimensions that apply to every project: - - 6 hygiene dimensions (tests, lint, type_check, coverage, guard_patterns, config_parser) + - 7 hygiene dimensions (tests, lint, type_check, coverage, guard_patterns, + config_parser, security) - 5 growth dimensions (capability_surface, experiment_diversity, observability, research_grounding, factory_effectiveness) @@ -123,7 +124,7 @@ async def _run_project_eval( """Run the project's eval/score.py (if it exists) and return additional results. Returns an empty list if the command fails or returns no results. - These are project-specific ADDITIONS to the mandatory 11 dimensions. + These are project-specific ADDITIONS to the mandatory 12 dimensions. """ parts = eval_command.split() diff --git a/factory/security/__init__.py b/factory/security/__init__.py new file mode 100644 index 0000000..c05c68d --- /dev/null +++ b/factory/security/__init__.py @@ -0,0 +1,154 @@ +"""Security scanning subsystem: pluggable scanner architecture. + +Provides a Protocol-based scanner interface, a registry with auto-detection, +and concrete scanner implementations for multiple security tools. + +Usage: + from factory.security import ScannerRegistry + + registry = ScannerRegistry() + results = registry.scan(project_path) +""" + +from __future__ import annotations + +import structlog +from pathlib import Path +from typing import Protocol, runtime_checkable + +from factory.security.models import SecurityScanResult + +log = structlog.get_logger() + + +@runtime_checkable +class SecurityScanner(Protocol): + """Interface for security scanning tools. + + Each scanner must implement: + - name: human-readable scanner identifier + - detect: check if the scanner is applicable to the project + - run: execute the scan and return structured results + """ + + @property + def name(self) -> str: + """Human-readable name for this scanner (e.g. 'bandit', 'npm-audit').""" + ... + + def detect(self, project_path: Path) -> bool: + """Return True if this scanner is applicable to the given project. + + This checks both project compatibility (e.g. Python project for bandit) + and tool availability (e.g. bandit is installed). + """ + ... + + def run(self, project_path: Path) -> SecurityScanResult: + """Execute the security scan and return structured results. + + Should not raise exceptions. If the scanner fails, return a + SecurityScanResult with passed=False and details explaining the failure. + """ + ... + + +class ScannerRegistry: + """Registry of security scanners with auto-detection. + + Scanners are registered at import time and auto-detected per project. + The registry handles sub-project discovery so individual scanners + only need to operate on a single project root. + """ + + def __init__(self) -> None: + self._scanners: list[SecurityScanner] = [] + + def register(self, scanner: SecurityScanner) -> None: + """Add a scanner to the registry.""" + self._scanners.append(scanner) + + @property + def scanners(self) -> list[SecurityScanner]: + """All registered scanners.""" + return list(self._scanners) + + def detect(self, project_path: Path) -> list[SecurityScanner]: + """Return scanners applicable to the given project path.""" + applicable = [] + for scanner in self._scanners: + try: + if scanner.detect(project_path): + applicable.append(scanner) + except Exception: + log.warning("scanner_detect_error", scanner=scanner.name, exc_info=True) + return applicable + + def scan(self, project_path: Path) -> list[SecurityScanResult]: + """Run all applicable scanners against the project. + + Returns a list of SecurityScanResult, one per scanner that ran. + Scanners that are not applicable (detect returns False) are skipped. + """ + results: list[SecurityScanResult] = [] + applicable = self.detect(project_path) + + if not applicable: + log.debug("no_applicable_scanners", project=str(project_path)) + return results + + for scanner in applicable: + try: + result = scanner.run(project_path) + results.append(result) + log.debug( + "scanner_completed", + scanner=scanner.name, + issues=result.issue_count, + passed=result.passed, + ) + except Exception: + log.warning("scanner_run_error", scanner=scanner.name, exc_info=True) + results.append( + SecurityScanResult( + scanner_name=scanner.name, + passed=False, + details=f"Scanner {scanner.name} failed with an unexpected error", + ) + ) + + return results + + +# Global default registry instance, pre-populated with all built-in scanners. +_default_registry: ScannerRegistry | None = None + + +def get_default_registry() -> ScannerRegistry: + """Return the global default scanner registry, creating it on first call. + + Lazily imports and registers all built-in scanners to avoid circular + imports and unnecessary work if the security subsystem is not used. + """ + global _default_registry + if _default_registry is not None: + return _default_registry + + _default_registry = ScannerRegistry() + + # Import and register built-in scanners + from factory.security.scanners import ( + BanditScanner, + GitSecretsScanner, + NpmAuditScanner, + SemgrepScanner, + TrivyScanner, + ) + + _default_registry.register(BanditScanner()) + _default_registry.register(NpmAuditScanner()) + _default_registry.register(SemgrepScanner()) + _default_registry.register(TrivyScanner()) + _default_registry.register(GitSecretsScanner()) + + return _default_registry diff --git a/factory/security/models.py b/factory/security/models.py new file mode 100644 index 0000000..bca1721 --- /dev/null +++ b/factory/security/models.py @@ -0,0 +1,55 @@ +"""Data models for the security scanning subsystem. + +SecurityIssue represents a single finding from any scanner. +SecurityScanResult aggregates issues from a single scanner run. +""" + +from __future__ import annotations + +from enum import Enum + +from pydantic import BaseModel, ConfigDict + + +class SecuritySeverity(str, Enum): + """Severity levels for security findings, ordered from most to least critical.""" + + CRITICAL = "critical" + HIGH = "high" + MEDIUM = "medium" + LOW = "low" + INFO = "info" + + +class SecurityIssue(BaseModel): + """A single security finding from a scanner.""" + + model_config = ConfigDict(strict=True, extra="forbid") + + severity: SecuritySeverity + category: str + file: str = "" + line: int | None = None + message: str = "" + remediation: str = "" + scanner: str = "" + + +class SecurityScanResult(BaseModel): + """Aggregated result from a single scanner run.""" + + model_config = ConfigDict(strict=True, extra="forbid") + + scanner_name: str + issues: list[SecurityIssue] = [] + passed: bool = True + details: str = "" + duration_seconds: float = 0.0 + + @property + def issue_count(self) -> int: + return len(self.issues) + + def issues_by_severity(self, severity: SecuritySeverity) -> list[SecurityIssue]: + """Filter issues by severity level.""" + return [i for i in self.issues if i.severity == severity] diff --git a/factory/security/scanners.py b/factory/security/scanners.py new file mode 100644 index 0000000..58be70c --- /dev/null +++ b/factory/security/scanners.py @@ -0,0 +1,502 @@ +"""Built-in security scanner implementations. + +Each scanner follows the SecurityScanner protocol: + - detect(): check project compatibility + tool availability + - run(): execute scan, parse output, return SecurityScanResult + +Concrete scanners: + - BanditScanner: Python static analysis (bandit) + - NpmAuditScanner: Node.js dependency audit (npm audit) + - SemgrepScanner: Multi-language pattern matching (semgrep) + - TrivyScanner: Container/filesystem vulnerability scanning (trivy) + - GitSecretsScanner: Hardcoded secrets detection (git-secrets) +""" + +from __future__ import annotations + +import json +import os +import subprocess +import time +from pathlib import Path + +import structlog + +from factory.security.models import SecurityIssue, SecurityScanResult, SecuritySeverity + +log = structlog.get_logger() + + +# ── Shared helpers ─────────────────────────────────────────────── + + +def _run_cmd( + cmd: list[str], + cwd: Path, + timeout: int = 120, +) -> tuple[int, str, str]: + """Run a command, return (returncode, stdout, stderr). Never raises.""" + env = {k: v for k, v in os.environ.items() if k != "VIRTUAL_ENV"} + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout, + cwd=cwd, + env=env, + ) + return result.returncode, result.stdout, result.stderr + except subprocess.TimeoutExpired: + return 1, "", f"Timed out after {timeout}s" + except FileNotFoundError: + return 1, "", f"Command not found: {cmd[0]}" + except Exception as exc: + return 1, "", str(exc) + + +def _tool_available(cmd: list[str]) -> bool: + """Check if a CLI tool is available by running a version/help command.""" + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=10, + ) + return result.returncode == 0 + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + return False + + +def _detect_python_project(project_path: Path) -> bool: + return (project_path / "pyproject.toml").exists() or (project_path / "setup.py").exists() + + +def _detect_node_project(project_path: Path) -> bool: + return (project_path / "package.json").exists() + + +# ── Severity mapping helpers ───────────────────────────────────── + + +_BANDIT_SEVERITY_MAP: dict[str, SecuritySeverity] = { + "HIGH": SecuritySeverity.HIGH, + "MEDIUM": SecuritySeverity.MEDIUM, + "LOW": SecuritySeverity.LOW, +} + +_NPM_SEVERITY_MAP: dict[str, SecuritySeverity] = { + "critical": SecuritySeverity.CRITICAL, + "high": SecuritySeverity.HIGH, + "moderate": SecuritySeverity.MEDIUM, + "low": SecuritySeverity.LOW, + "info": SecuritySeverity.INFO, +} + + +# ── BanditScanner ──────────────────────────────────────────────── + + +class BanditScanner: + """Python security scanner using bandit. + + Runs `bandit -r . -f json -q` and parses the JSON output to extract + individual security issues with severity, file location, and remediation. + """ + + @property + def name(self) -> str: + return "bandit" + + def detect(self, project_path: Path) -> bool: + """Detect if this is a Python project and bandit is available.""" + if not _detect_python_project(project_path): + return False + return _tool_available(["python", "-m", "bandit", "--version"]) + + def run(self, project_path: Path) -> SecurityScanResult: + """Run bandit and parse JSON results.""" + start = time.monotonic() + rc, stdout, stderr = _run_cmd( + ["python", "-m", "bandit", "-r", ".", "-f", "json", "-q"], + project_path, + ) + duration = time.monotonic() - start + + if rc == 1 and "Command not found" in stderr: + return SecurityScanResult( + scanner_name=self.name, + passed=True, + details="bandit not installed", + duration_seconds=duration, + ) + + issues: list[SecurityIssue] = [] + try: + data = json.loads(stdout) if stdout.strip() else {} + for finding in data.get("results", []): + severity_str = finding.get("issue_severity", "LOW") + severity = _BANDIT_SEVERITY_MAP.get(severity_str, SecuritySeverity.LOW) + issues.append( + SecurityIssue( + severity=severity, + category=finding.get("test_id", "unknown"), + file=finding.get("filename", ""), + line=finding.get("line_number"), + message=finding.get("issue_text", ""), + remediation=finding.get("more_info", ""), + scanner=self.name, + ) + ) + except (json.JSONDecodeError, TypeError): + log.warning("bandit_parse_error", stdout=stdout[:200]) + + passed = len(issues) == 0 + if issues: + details = f"{len(issues)} issues found" + else: + details = "clean" + + return SecurityScanResult( + scanner_name=self.name, + issues=issues, + passed=passed, + details=details, + duration_seconds=duration, + ) + + +# ── NpmAuditScanner ────────────────────────────────────────────── + + +class NpmAuditScanner: + """Node.js dependency vulnerability scanner using npm audit. + + Runs `npm audit --json` and parses the vulnerability metadata + to extract counts by severity level. + """ + + @property + def name(self) -> str: + return "npm-audit" + + def detect(self, project_path: Path) -> bool: + """Detect if this is a Node project with a lockfile.""" + if not _detect_node_project(project_path): + return False + # npm audit requires a lockfile + has_lockfile = ( + (project_path / "package-lock.json").exists() + or (project_path / "npm-shrinkwrap.json").exists() + ) + return has_lockfile and _tool_available(["npm", "--version"]) + + def run(self, project_path: Path) -> SecurityScanResult: + """Run npm audit and parse JSON results.""" + start = time.monotonic() + rc, stdout, stderr = _run_cmd( + ["npm", "audit", "--json"], + project_path, + timeout=180, + ) + duration = time.monotonic() - start + + if rc == 1 and "Command not found" in stderr: + return SecurityScanResult( + scanner_name=self.name, + passed=True, + details="npm not installed", + duration_seconds=duration, + ) + + issues: list[SecurityIssue] = [] + try: + data = json.loads(stdout) if stdout.strip() else {} + vulns = data.get("metadata", {}).get("vulnerabilities", {}) + for sev_name, count in vulns.items(): + if count > 0: + severity = _NPM_SEVERITY_MAP.get(sev_name, SecuritySeverity.LOW) + # npm audit gives counts per severity, not individual issues. + # Create one issue per severity level with the count. + issues.append( + SecurityIssue( + severity=severity, + category="dependency_vulnerability", + message=f"{count} {sev_name} vulnerabilit{'y' if count == 1 else 'ies'}", + scanner=self.name, + ) + ) + except (json.JSONDecodeError, TypeError): + log.warning("npm_audit_parse_error", stdout=stdout[:200]) + + # Total vulnerability count from metadata + total_vulns = 0 + try: + data = json.loads(stdout) if stdout.strip() else {} + vulns = data.get("metadata", {}).get("vulnerabilities", {}) + total_vulns = sum(vulns.get(s, 0) for s in ("low", "moderate", "high", "critical")) + except (json.JSONDecodeError, TypeError, ValueError): + pass + + passed = total_vulns == 0 + if total_vulns > 0: + details = f"{total_vulns} vulnerabilities" + else: + details = "clean" + + return SecurityScanResult( + scanner_name=self.name, + issues=issues, + passed=passed, + details=details, + duration_seconds=duration, + ) + + +# ── SemgrepScanner ─────────────────────────────────────────────── + + +class SemgrepScanner: + """Multi-language static analysis using semgrep. + + Runs `semgrep scan --json --config auto` for broad rule coverage. + Supports Python, JavaScript, TypeScript, Go, Ruby, Java, and more. + """ + + @property + def name(self) -> str: + return "semgrep" + + def detect(self, project_path: Path) -> bool: + """Detect if semgrep is installed. Semgrep supports many languages, + so project type detection is not needed.""" + return _tool_available(["semgrep", "--version"]) + + def run(self, project_path: Path) -> SecurityScanResult: + """Run semgrep and parse JSON results.""" + start = time.monotonic() + rc, stdout, stderr = _run_cmd( + ["semgrep", "scan", "--json", "--config", "auto", "--quiet"], + project_path, + timeout=300, + ) + duration = time.monotonic() - start + + if rc == 1 and "Command not found" in stderr: + return SecurityScanResult( + scanner_name=self.name, + passed=True, + details="semgrep not installed", + duration_seconds=duration, + ) + + issues: list[SecurityIssue] = [] + try: + data = json.loads(stdout) if stdout.strip() else {} + for finding in data.get("results", []): + sev_str = finding.get("extra", {}).get("severity", "WARNING") + severity = _semgrep_severity(sev_str) + issues.append( + SecurityIssue( + severity=severity, + category=finding.get("check_id", "unknown"), + file=finding.get("path", ""), + line=finding.get("start", {}).get("line"), + message=finding.get("extra", {}).get("message", ""), + remediation=finding.get("extra", {}).get("fix", ""), + scanner=self.name, + ) + ) + except (json.JSONDecodeError, TypeError): + log.warning("semgrep_parse_error", stdout=stdout[:200]) + + passed = len(issues) == 0 + details = f"{len(issues)} findings" if issues else "clean" + + return SecurityScanResult( + scanner_name=self.name, + issues=issues, + passed=passed, + details=details, + duration_seconds=duration, + ) + + +def _semgrep_severity(sev: str) -> SecuritySeverity: + """Map semgrep severity string to SecuritySeverity.""" + mapping = { + "ERROR": SecuritySeverity.HIGH, + "WARNING": SecuritySeverity.MEDIUM, + "INFO": SecuritySeverity.LOW, + } + return mapping.get(sev.upper(), SecuritySeverity.MEDIUM) + + +# ── TrivyScanner ───────────────────────────────────────────────── + + +class TrivyScanner: + """Filesystem and container vulnerability scanner using trivy. + + Runs `trivy fs --format json --security-checks vuln,secret .` to scan + for known vulnerabilities in dependencies and hardcoded secrets. + """ + + @property + def name(self) -> str: + return "trivy" + + def detect(self, project_path: Path) -> bool: + """Detect if trivy is installed.""" + return _tool_available(["trivy", "--version"]) + + def run(self, project_path: Path) -> SecurityScanResult: + """Run trivy filesystem scan and parse JSON results.""" + start = time.monotonic() + rc, stdout, stderr = _run_cmd( + ["trivy", "fs", "--format", "json", "--scanners", "vuln,secret", "."], + project_path, + timeout=300, + ) + duration = time.monotonic() - start + + if rc == 1 and "Command not found" in stderr: + return SecurityScanResult( + scanner_name=self.name, + passed=True, + details="trivy not installed", + duration_seconds=duration, + ) + + issues: list[SecurityIssue] = [] + try: + data = json.loads(stdout) if stdout.strip() else {} + for result_entry in data.get("Results", []): + target = result_entry.get("Target", "") + for vuln in result_entry.get("Vulnerabilities", []): + severity = _trivy_severity(vuln.get("Severity", "UNKNOWN")) + issues.append( + SecurityIssue( + severity=severity, + category=vuln.get("VulnerabilityID", "unknown"), + file=target, + message=vuln.get("Title", vuln.get("Description", "")), + remediation=vuln.get("FixedVersion", ""), + scanner=self.name, + ) + ) + for secret in result_entry.get("Secrets", []): + issues.append( + SecurityIssue( + severity=SecuritySeverity.HIGH, + category="hardcoded_secret", + file=target, + line=secret.get("StartLine"), + message=secret.get("Title", "Hardcoded secret detected"), + scanner=self.name, + ) + ) + except (json.JSONDecodeError, TypeError): + log.warning("trivy_parse_error", stdout=stdout[:200]) + + passed = len(issues) == 0 + details = f"{len(issues)} findings" if issues else "clean" + + return SecurityScanResult( + scanner_name=self.name, + issues=issues, + passed=passed, + details=details, + duration_seconds=duration, + ) + + +def _trivy_severity(sev: str) -> SecuritySeverity: + """Map trivy severity string to SecuritySeverity.""" + mapping = { + "CRITICAL": SecuritySeverity.CRITICAL, + "HIGH": SecuritySeverity.HIGH, + "MEDIUM": SecuritySeverity.MEDIUM, + "LOW": SecuritySeverity.LOW, + "UNKNOWN": SecuritySeverity.INFO, + } + return mapping.get(sev.upper(), SecuritySeverity.MEDIUM) + + +# ── GitSecretsScanner ──────────────────────────────────────────── + + +class GitSecretsScanner: + """Hardcoded secrets detector using git-secrets. + + Runs `git secrets --scan` to detect AWS keys, passwords, and other + sensitive data committed to the repository. + """ + + @property + def name(self) -> str: + return "git-secrets" + + def detect(self, project_path: Path) -> bool: + """Detect if git-secrets is installed and this is a git repo.""" + if not (project_path / ".git").exists(): + return False + return _tool_available(["git", "secrets", "--list"]) + + def run(self, project_path: Path) -> SecurityScanResult: + """Run git-secrets scan and parse output.""" + start = time.monotonic() + rc, stdout, stderr = _run_cmd( + ["git", "secrets", "--scan", "-r", "."], + project_path, + ) + duration = time.monotonic() - start + + if rc == 1 and "Command not found" in stderr: + return SecurityScanResult( + scanner_name=self.name, + passed=True, + details="git-secrets not installed", + duration_seconds=duration, + ) + + issues: list[SecurityIssue] = [] + if rc != 0 and stdout.strip(): + # git-secrets outputs matched lines as: filename:line_number:matched_content + for line in stdout.strip().splitlines(): + parts = line.split(":", 2) + file_path = parts[0] if len(parts) > 0 else "" + line_num = None + if len(parts) > 1: + try: + line_num = int(parts[1]) + except ValueError: + pass + issues.append( + SecurityIssue( + severity=SecuritySeverity.HIGH, + category="hardcoded_secret", + file=file_path, + line=line_num, + message="Potential secret or credential detected", + remediation="Remove the secret and rotate credentials", + scanner=self.name, + ) + ) + + passed = len(issues) == 0 + if issues: + details = f"{len(issues)} potential secrets found" + elif rc == 0: + details = "clean" + else: + details = "scan completed with warnings" + + return SecurityScanResult( + scanner_name=self.name, + issues=issues, + passed=passed, + details=details, + duration_seconds=duration, + ) diff --git a/tests/eval/test_hygiene.py b/tests/eval/test_hygiene.py index 7f96172..078f21b 100644 --- a/tests/eval/test_hygiene.py +++ b/tests/eval/test_hygiene.py @@ -18,9 +18,10 @@ def test_weights_sum_to_one(self): total = sum(HYGIENE_WEIGHTS.values()) assert abs(total - 1.0) < 1e-9 - def test_all_six_dimensions(self): + def test_all_seven_dimensions(self): assert set(HYGIENE_WEIGHTS.keys()) == { "tests", "lint", "type_check", "coverage", "guard_patterns", "config_parser", + "security", } @@ -125,11 +126,11 @@ def test_valid_factory_md(self, tmp_path): class TestComputeHygieneResults: - def test_returns_all_six(self, tmp_path): + def test_returns_all_seven(self, tmp_path): results = compute_hygiene_results(tmp_path) - assert len(results) == 6 + assert len(results) == 7 names = {r["name"] for r in results} - assert names == {"tests", "lint", "type_check", "coverage", "guard_patterns", "config_parser"} + assert names == {"tests", "lint", "type_check", "coverage", "guard_patterns", "config_parser", "security"} def test_all_have_required_keys(self, tmp_path): results = compute_hygiene_results(tmp_path) diff --git a/tests/eval/test_runner.py b/tests/eval/test_runner.py index 38a58fc..b49420b 100644 --- a/tests/eval/test_runner.py +++ b/tests/eval/test_runner.py @@ -7,26 +7,27 @@ class TestRunEval: async def test_always_has_mandatory_dimensions(self, tmp_path): - """Even with no project eval, all 11 mandatory dimensions are present.""" + """Even with no project eval, all 12 mandatory dimensions are present.""" # No eval/score.py — just mandatory dimensions result = await run_eval("true", tmp_path, threshold=0.0) names = {r.name for r in result.results} - # 6 hygiene + 5 growth = 11 mandatory + # 7 hygiene + 5 growth = 12 mandatory assert "tests" in names assert "lint" in names assert "type_check" in names assert "coverage" in names assert "guard_patterns" in names assert "config_parser" in names + assert "security" in names assert "capability_surface" in names assert "experiment_diversity" in names assert "observability" in names assert "research_grounding" in names assert "factory_effectiveness" in names - assert len(result.results) >= 11 + assert len(result.results) >= 12 async def test_project_additions_merged(self, tmp_path): - """Project eval/score.py can add extra dimensions beyond the 11.""" + """Project eval/score.py can add extra dimensions beyond the 12.""" script = tmp_path / "score.py" script.write_text( 'import json, sys\n' @@ -37,10 +38,10 @@ async def test_project_additions_merged(self, tmp_path): ) result = await run_eval(f"{sys.executable} {script}", tmp_path, threshold=0.0) names = {r.name for r in result.results} - # 11 mandatory + 2 project additions + # 12 mandatory + 2 project additions assert "ui_renders" in names assert "api_health" in names - assert len(result.results) >= 13 + assert len(result.results) >= 14 async def test_project_cannot_override_mandatory(self, tmp_path): """If project eval returns a dimension with the same name as mandatory, it's ignored.""" @@ -61,8 +62,8 @@ async def test_failed_project_eval_still_has_mandatory(self, tmp_path): """If project eval command fails, mandatory dimensions still run.""" result = await run_eval("nonexistent_command_xyz", tmp_path, threshold=0.0) names = {r.name for r in result.results} - # All 11 mandatory should still be present - assert len(names) >= 11 + # All 12 mandatory should still be present + assert len(names) >= 12 assert "tests" in names assert "capability_surface" in names @@ -79,12 +80,12 @@ async def test_timeout_project_eval(self, tmp_path): result = await run_eval(f"{sys.executable} {script}", tmp_path, threshold=0.0, timeout=1.0) # Mandatory dimensions still computed names = {r.name for r in result.results} - assert len(names) >= 11 + assert len(names) >= 12 async def test_weight_split_is_50_50(self, tmp_path): """Hygiene dimensions get 50% total weight, growth gets 50%.""" result = await run_eval("true", tmp_path, threshold=0.0) - hygiene_names = {"tests", "lint", "type_check", "coverage", "guard_patterns", "config_parser"} + hygiene_names = {"tests", "lint", "type_check", "coverage", "guard_patterns", "config_parser", "security"} growth_names = { "capability_surface", "experiment_diversity", "observability", "research_grounding", "factory_effectiveness", diff --git a/tests/test_security.py b/tests/test_security.py new file mode 100644 index 0000000..ce11f6e --- /dev/null +++ b/tests/test_security.py @@ -0,0 +1,599 @@ +"""Tests for factory.security — pluggable scanner architecture. + +Covers: + - SecurityIssue and SecurityScanResult models + - ScannerRegistry (registration, detection, scanning) + - BanditScanner, NpmAuditScanner (with mocked subprocess) + - SemgrepScanner, TrivyScanner, GitSecretsScanner (with mocked subprocess) + - eval_security() integration via the registry +""" + +import json +from pathlib import Path +from unittest.mock import patch + +from factory.security import ScannerRegistry, get_default_registry +from factory.security.models import ( + SecurityIssue, + SecurityScanResult, + SecuritySeverity, +) +from factory.security.scanners import ( + BanditScanner, + GitSecretsScanner, + NpmAuditScanner, + SemgrepScanner, + TrivyScanner, +) + + +# ── Model tests ────────────────────────────────────────────────── + + +class TestSecuritySeverity: + def test_ordering(self): + """Severity enum values are strings.""" + assert SecuritySeverity.CRITICAL.value == "critical" + assert SecuritySeverity.HIGH.value == "high" + assert SecuritySeverity.INFO.value == "info" + + +class TestSecurityIssue: + def test_minimal_issue(self): + issue = SecurityIssue( + severity=SecuritySeverity.HIGH, + category="B101", + scanner="bandit", + ) + assert issue.severity == SecuritySeverity.HIGH + assert issue.file == "" + assert issue.line is None + assert issue.remediation == "" + + def test_full_issue(self): + issue = SecurityIssue( + severity=SecuritySeverity.CRITICAL, + category="dependency_vulnerability", + file="package-lock.json", + line=42, + message="Known vulnerability in lodash", + remediation="Upgrade to lodash>=4.17.21", + scanner="npm-audit", + ) + assert issue.file == "package-lock.json" + assert issue.line == 42 + assert issue.scanner == "npm-audit" + + +class TestSecurityScanResult: + def test_empty_result(self): + result = SecurityScanResult(scanner_name="test") + assert result.passed is True + assert result.issue_count == 0 + assert result.issues == [] + + def test_result_with_issues(self): + issues = [ + SecurityIssue(severity=SecuritySeverity.HIGH, category="B101", scanner="bandit"), + SecurityIssue(severity=SecuritySeverity.LOW, category="B105", scanner="bandit"), + ] + result = SecurityScanResult( + scanner_name="bandit", + issues=issues, + passed=False, + details="2 issues found", + ) + assert result.issue_count == 2 + assert result.passed is False + + def test_issues_by_severity(self): + issues = [ + SecurityIssue(severity=SecuritySeverity.HIGH, category="A", scanner="test"), + SecurityIssue(severity=SecuritySeverity.LOW, category="B", scanner="test"), + SecurityIssue(severity=SecuritySeverity.HIGH, category="C", scanner="test"), + ] + result = SecurityScanResult(scanner_name="test", issues=issues, passed=False) + highs = result.issues_by_severity(SecuritySeverity.HIGH) + assert len(highs) == 2 + lows = result.issues_by_severity(SecuritySeverity.LOW) + assert len(lows) == 1 + crits = result.issues_by_severity(SecuritySeverity.CRITICAL) + assert len(crits) == 0 + + +# ── Registry tests ─────────────────────────────────────────────── + + +class _StubScanner: + """Minimal scanner for testing the registry.""" + + def __init__(self, scanner_name: str, detects: bool = True, issue_count: int = 0): + self._name = scanner_name + self._detects = detects + self._issue_count = issue_count + + @property + def name(self) -> str: + return self._name + + def detect(self, project_path: Path) -> bool: + return self._detects + + def run(self, project_path: Path) -> SecurityScanResult: + issues = [ + SecurityIssue(severity=SecuritySeverity.LOW, category="test", scanner=self._name) + for _ in range(self._issue_count) + ] + return SecurityScanResult( + scanner_name=self._name, + issues=issues, + passed=self._issue_count == 0, + details="clean" if self._issue_count == 0 else f"{self._issue_count} issues", + ) + + +class TestScannerRegistry: + def test_empty_registry(self, tmp_path): + registry = ScannerRegistry() + assert registry.scanners == [] + assert registry.detect(tmp_path) == [] + assert registry.scan(tmp_path) == [] + + def test_register_and_detect(self, tmp_path): + registry = ScannerRegistry() + scanner_a = _StubScanner("a", detects=True) + scanner_b = _StubScanner("b", detects=False) + registry.register(scanner_a) + registry.register(scanner_b) + assert len(registry.scanners) == 2 + applicable = registry.detect(tmp_path) + assert len(applicable) == 1 + assert applicable[0].name == "a" + + def test_scan_runs_applicable_only(self, tmp_path): + registry = ScannerRegistry() + registry.register(_StubScanner("detected", detects=True, issue_count=2)) + registry.register(_StubScanner("skipped", detects=False, issue_count=5)) + results = registry.scan(tmp_path) + assert len(results) == 1 + assert results[0].scanner_name == "detected" + assert results[0].issue_count == 2 + + def test_scan_handles_exception(self, tmp_path): + """A scanner that raises during run should not break the registry.""" + + class CrashingScanner: + @property + def name(self) -> str: + return "crasher" + + def detect(self, project_path: Path) -> bool: + return True + + def run(self, project_path: Path) -> SecurityScanResult: + raise RuntimeError("Scanner exploded") + + registry = ScannerRegistry() + registry.register(CrashingScanner()) + results = registry.scan(tmp_path) + assert len(results) == 1 + assert results[0].passed is False + assert "unexpected error" in results[0].details + + def test_detect_handles_exception(self, tmp_path): + """A scanner that raises during detect should be skipped.""" + + class BadDetectScanner: + @property + def name(self) -> str: + return "bad-detect" + + def detect(self, project_path: Path) -> bool: + raise RuntimeError("Detect failed") + + def run(self, project_path: Path) -> SecurityScanResult: + return SecurityScanResult(scanner_name="bad-detect") + + registry = ScannerRegistry() + registry.register(BadDetectScanner()) + applicable = registry.detect(tmp_path) + assert applicable == [] + + +class TestDefaultRegistry: + def test_default_registry_has_all_scanners(self): + # Reset the global so we get a fresh one + import factory.security + factory.security._default_registry = None + registry = get_default_registry() + names = {s.name for s in registry.scanners} + assert names == {"bandit", "npm-audit", "semgrep", "trivy", "git-secrets"} + + +# ── BanditScanner tests ───────────────────────────────────────── + + +class TestBanditScanner: + def test_detect_python_project_with_bandit(self, tmp_path): + (tmp_path / "pyproject.toml").write_text("[project]\n") + scanner = BanditScanner() + with patch("factory.security.scanners._tool_available", return_value=True): + assert scanner.detect(tmp_path) is True + + def test_detect_non_python_project(self, tmp_path): + scanner = BanditScanner() + assert scanner.detect(tmp_path) is False + + def test_detect_bandit_not_installed(self, tmp_path): + (tmp_path / "pyproject.toml").write_text("[project]\n") + scanner = BanditScanner() + with patch("factory.security.scanners._tool_available", return_value=False): + assert scanner.detect(tmp_path) is False + + def test_run_clean(self, tmp_path): + scanner = BanditScanner() + bandit_output = json.dumps({"results": []}) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, bandit_output, "") + result = scanner.run(tmp_path) + assert result.passed is True + assert result.issue_count == 0 + assert result.details == "clean" + + def test_run_with_issues(self, tmp_path): + scanner = BanditScanner() + bandit_output = json.dumps({ + "results": [ + { + "issue_severity": "HIGH", + "test_id": "B101", + "filename": "app.py", + "line_number": 10, + "issue_text": "Use of exec detected", + "more_info": "https://bandit.readthedocs.io/...", + }, + { + "issue_severity": "MEDIUM", + "test_id": "B105", + "filename": "config.py", + "line_number": 5, + "issue_text": "Hardcoded password", + }, + ], + }) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, bandit_output, "") + result = scanner.run(tmp_path) + assert result.passed is False + assert result.issue_count == 2 + assert result.issues[0].severity == SecuritySeverity.HIGH + assert result.issues[0].category == "B101" + assert result.issues[0].file == "app.py" + assert result.issues[0].line == 10 + assert result.issues[1].severity == SecuritySeverity.MEDIUM + + def test_run_bandit_not_found(self, tmp_path): + scanner = BanditScanner() + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, "", "Command not found: bandit") + result = scanner.run(tmp_path) + assert result.passed is True + assert "not installed" in result.details + + def test_run_invalid_json(self, tmp_path): + scanner = BanditScanner() + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, "not json", "") + result = scanner.run(tmp_path) + assert result.passed is True + assert result.issue_count == 0 + + +# ── NpmAuditScanner tests ─────────────────────────────────────── + + +class TestNpmAuditScanner: + def test_detect_node_project_with_lockfile(self, tmp_path): + (tmp_path / "package.json").write_text("{}\n") + (tmp_path / "package-lock.json").write_text("{}\n") + scanner = NpmAuditScanner() + with patch("factory.security.scanners._tool_available", return_value=True): + assert scanner.detect(tmp_path) is True + + def test_detect_no_lockfile(self, tmp_path): + (tmp_path / "package.json").write_text("{}\n") + scanner = NpmAuditScanner() + assert scanner.detect(tmp_path) is False + + def test_detect_not_node_project(self, tmp_path): + scanner = NpmAuditScanner() + assert scanner.detect(tmp_path) is False + + def test_run_clean(self, tmp_path): + scanner = NpmAuditScanner() + audit_output = json.dumps({ + "metadata": { + "vulnerabilities": {"low": 0, "moderate": 0, "high": 0, "critical": 0}, + }, + }) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, audit_output, "") + result = scanner.run(tmp_path) + assert result.passed is True + assert result.details == "clean" + + def test_run_with_vulnerabilities(self, tmp_path): + scanner = NpmAuditScanner() + audit_output = json.dumps({ + "metadata": { + "vulnerabilities": {"low": 2, "moderate": 1, "high": 1, "critical": 0}, + }, + }) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, audit_output, "") + result = scanner.run(tmp_path) + assert result.passed is False + assert "4 vulnerabilities" in result.details + + def test_run_npm_not_found(self, tmp_path): + scanner = NpmAuditScanner() + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, "", "Command not found: npm") + result = scanner.run(tmp_path) + assert result.passed is True + assert "not installed" in result.details + + +# ── SemgrepScanner tests ──────────────────────────────────────── + + +class TestSemgrepScanner: + def test_detect_checks_tool_availability(self, tmp_path): + scanner = SemgrepScanner() + with patch("factory.security.scanners._tool_available", return_value=True): + assert scanner.detect(tmp_path) is True + with patch("factory.security.scanners._tool_available", return_value=False): + assert scanner.detect(tmp_path) is False + + def test_run_clean(self, tmp_path): + scanner = SemgrepScanner() + semgrep_output = json.dumps({"results": []}) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, semgrep_output, "") + result = scanner.run(tmp_path) + assert result.passed is True + assert result.details == "clean" + + def test_run_with_findings(self, tmp_path): + scanner = SemgrepScanner() + semgrep_output = json.dumps({ + "results": [ + { + "check_id": "python.lang.security.audit.exec-detected", + "path": "app.py", + "start": {"line": 15}, + "extra": { + "severity": "ERROR", + "message": "Detected use of exec()", + "fix": "Use ast.literal_eval instead", + }, + }, + ], + }) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, semgrep_output, "") + result = scanner.run(tmp_path) + assert result.passed is False + assert result.issue_count == 1 + assert result.issues[0].severity == SecuritySeverity.HIGH + assert result.issues[0].file == "app.py" + assert result.issues[0].line == 15 + + def test_run_not_installed(self, tmp_path): + scanner = SemgrepScanner() + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, "", "Command not found: semgrep") + result = scanner.run(tmp_path) + assert result.passed is True + + +# ── TrivyScanner tests ────────────────────────────────────────── + + +class TestTrivyScanner: + def test_detect_checks_tool_availability(self, tmp_path): + scanner = TrivyScanner() + with patch("factory.security.scanners._tool_available", return_value=True): + assert scanner.detect(tmp_path) is True + with patch("factory.security.scanners._tool_available", return_value=False): + assert scanner.detect(tmp_path) is False + + def test_run_clean(self, tmp_path): + scanner = TrivyScanner() + trivy_output = json.dumps({"Results": []}) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, trivy_output, "") + result = scanner.run(tmp_path) + assert result.passed is True + assert result.details == "clean" + + def test_run_with_vulnerabilities(self, tmp_path): + scanner = TrivyScanner() + trivy_output = json.dumps({ + "Results": [ + { + "Target": "requirements.txt", + "Vulnerabilities": [ + { + "VulnerabilityID": "CVE-2024-1234", + "Severity": "CRITICAL", + "Title": "Remote code execution in foo", + "FixedVersion": "2.0.1", + }, + ], + "Secrets": [], + }, + ], + }) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, trivy_output, "") + result = scanner.run(tmp_path) + assert result.passed is False + assert result.issue_count == 1 + assert result.issues[0].severity == SecuritySeverity.CRITICAL + assert result.issues[0].category == "CVE-2024-1234" + + def test_run_with_secrets(self, tmp_path): + scanner = TrivyScanner() + trivy_output = json.dumps({ + "Results": [ + { + "Target": "config.py", + "Vulnerabilities": [], + "Secrets": [ + { + "StartLine": 5, + "Title": "AWS access key", + }, + ], + }, + ], + }) + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, trivy_output, "") + result = scanner.run(tmp_path) + assert result.passed is False + assert result.issue_count == 1 + assert result.issues[0].severity == SecuritySeverity.HIGH + assert result.issues[0].category == "hardcoded_secret" + + def test_run_not_installed(self, tmp_path): + scanner = TrivyScanner() + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, "", "Command not found: trivy") + result = scanner.run(tmp_path) + assert result.passed is True + + +# ── GitSecretsScanner tests ───────────────────────────────────── + + +class TestGitSecretsScanner: + def test_detect_needs_git_repo(self, tmp_path): + scanner = GitSecretsScanner() + assert scanner.detect(tmp_path) is False + + def test_detect_git_repo_with_tool(self, tmp_path): + (tmp_path / ".git").mkdir() + scanner = GitSecretsScanner() + with patch("factory.security.scanners._tool_available", return_value=True): + assert scanner.detect(tmp_path) is True + + def test_detect_git_repo_without_tool(self, tmp_path): + (tmp_path / ".git").mkdir() + scanner = GitSecretsScanner() + with patch("factory.security.scanners._tool_available", return_value=False): + assert scanner.detect(tmp_path) is False + + def test_run_clean(self, tmp_path): + scanner = GitSecretsScanner() + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (0, "", "") + result = scanner.run(tmp_path) + assert result.passed is True + assert result.details == "clean" + + def test_run_with_secrets(self, tmp_path): + scanner = GitSecretsScanner() + secrets_output = "config.py:10:AWS_SECRET_KEY=AKIA...\napp.py:25:password='admin123'" + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, secrets_output, "") + result = scanner.run(tmp_path) + assert result.passed is False + assert result.issue_count == 2 + assert result.issues[0].file == "config.py" + assert result.issues[0].line == 10 + assert result.issues[1].file == "app.py" + assert result.issues[1].line == 25 + + def test_run_not_installed(self, tmp_path): + scanner = GitSecretsScanner() + with patch("factory.security.scanners._run_cmd") as mock: + mock.return_value = (1, "", "Command not found: git") + result = scanner.run(tmp_path) + assert result.passed is True + + +# ── eval_security integration tests ───────────────────────────── + + +class TestEvalSecurityIntegration: + """Test eval_security() from hygiene.py using the scanner registry.""" + + def test_no_scanner_returns_neutral(self, tmp_path): + from factory.eval.hygiene import eval_security + # Empty dir, no project markers, no scanners detected + result = eval_security(tmp_path) + assert result["name"] == "security" + assert result["score"] == 0.5 + assert "Not detected" in result["details"] + + def test_clean_scan(self, tmp_path): + from factory.eval.hygiene import eval_security + + (tmp_path / "pyproject.toml").write_text("[project]\n") + + clean_result = SecurityScanResult( + scanner_name="bandit", + issues=[], + passed=True, + details="clean", + ) + with patch("factory.security.ScannerRegistry.scan", return_value=[clean_result]): + result = eval_security(tmp_path) + + assert result["score"] == 1.0 + assert result["passed"] is True + + def test_scan_with_issues(self, tmp_path): + from factory.eval.hygiene import eval_security + + (tmp_path / "pyproject.toml").write_text("[project]\n") + + issues = [ + SecurityIssue(severity=SecuritySeverity.HIGH, category="B101", scanner="bandit"), + SecurityIssue(severity=SecuritySeverity.MEDIUM, category="B105", scanner="bandit"), + SecurityIssue(severity=SecuritySeverity.LOW, category="B106", scanner="bandit"), + ] + result_with_issues = SecurityScanResult( + scanner_name="bandit", + issues=issues, + passed=False, + details="3 issues", + ) + with patch("factory.security.ScannerRegistry.scan", return_value=[result_with_issues]): + result = eval_security(tmp_path) + + assert result["score"] == round(1.0 - 3 * 0.1, 4) + assert result["passed"] is False + assert "3 issues" in result["details"] + + def test_score_floor_at_zero(self, tmp_path): + from factory.eval.hygiene import eval_security + + (tmp_path / "pyproject.toml").write_text("[project]\n") + + issues = [ + SecurityIssue(severity=SecuritySeverity.LOW, category=f"B{i}", scanner="bandit") + for i in range(15) + ] + result_many = SecurityScanResult( + scanner_name="bandit", + issues=issues, + passed=False, + details="15 issues", + ) + with patch("factory.security.ScannerRegistry.scan", return_value=[result_many]): + result = eval_security(tmp_path) + + assert result["score"] == 0.0