diff --git a/README.md b/README.md index fe225b6..36ae870 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,12 @@ > Ingest SCAP/SCC/OpenSCAP/Wazuh findings → produce eMASS-ready POAM + OSCAL Assessment Results. + +## What is this? + +stigsentry is a command-line tool for security and compliance teams who need to track and report on system vulnerabilities against government standards. It reads scan results from security tools like Wazuh or OpenSCAP and automatically maps each finding to the specific NIST 800-53 control and DISA STIG rule it violates. The output is a ready-to-import spreadsheet (POAM) for risk management systems like eMASS or Xacta, saving hours of manual cross-referencing. It is built for military, federal, and defense contractor teams who must maintain continuous ATO (Authority to Operate) packages. + + ## Upstream Forks / wraps **https://github.com/wazuh/wazuh**. See [`UPSTREAM.md`](./UPSTREAM.md) for the @@ -17,6 +23,52 @@ licensing posture, supported commits, and how to upgrade. - POAM CSV for eMASS / Xacta / RSA Archer - OSCAL Assessment Results JSON + +## Domains + +**Primary domain:** Government & Compliance · **JTF MERIDIAN division:** IRONCLAD · ANVIL + +**Topics:** `cognis` `compliance` `govtech` `grc` + +Part of the **Cognis Neural Suite** — 300+ source-available tools organized across 12 domains under the JTF MERIDIAN command structure. See the [suite on GitHub](https://github.com/cognis-digital) and [jtf-meridian](https://github.com/cognis-digital/jtf-meridian) for how the pieces fit together. + + + +## Install + +`stigsentry` is source-available (not published to PyPI) — every method below installs +straight from GitHub. Pick whichever you prefer; the one-line scripts auto-detect +the best tool available on your machine. + +**One-liner (Linux / macOS):** +```sh +curl -fsSL https://raw.githubusercontent.com/cognis-digital/stigsentry/HEAD/install.sh | sh +``` + +**One-liner (Windows PowerShell):** +```powershell +irm https://raw.githubusercontent.com/cognis-digital/stigsentry/HEAD/install.ps1 | iex +``` + +**Or install manually — any one of:** +```sh +pipx install "git+https://github.com/cognis-digital/stigsentry.git" # isolated (recommended) +uv tool install "git+https://github.com/cognis-digital/stigsentry.git" # uv +pip install "git+https://github.com/cognis-digital/stigsentry.git" # pip +``` + +**From source:** +```sh +git clone https://github.com/cognis-digital/stigsentry.git +cd stigsentry && pip install . +``` + +Then run: +```sh +stigsentry --help +``` + + ## Install ```bash @@ -68,7 +120,7 @@ These are emitted in JSON, SARIF, and the OSCAL skeleton. ```yaml - name: stigsentry scan run: | - pip install cognis-stigsentry + pip install "git+https://github.com/cognis-digital/stigsentry.git" stigsentry . --format=oscal --out=assessment-results.json --fail-on=high - name: Upload to eMASS/Xacta run: cognis-rmf-package import assessment-results.json diff --git a/cognis_mil/__init__.py b/cognis_mil/__init__.py index 10e8e7b..93e9646 100644 --- a/cognis_mil/__init__.py +++ b/cognis_mil/__init__.py @@ -7,10 +7,16 @@ - CLI builder with --classification flag (placeholder only) - Audit-log primitive (hash-chained, tamper-evident, local-only) """ -from .models import Severity, Finding, ScanResult -from .exporters import to_console, to_json, to_sarif, to_markdown, to_oscal_skeleton -from .cli import make_cli -from .audit import AuditLog -from .classmark import ClassificationBanner # re-export for convenience +from .models import Severity as Severity, Finding as Finding, ScanResult as ScanResult +from .exporters import ( + to_console as to_console, + to_json as to_json, + to_sarif as to_sarif, + to_markdown as to_markdown, + to_oscal_skeleton as to_oscal_skeleton, +) +from .cli import make_cli as make_cli +from .audit import AuditLog as AuditLog +from .classmark import ClassificationBanner as ClassificationBanner __version__ = "0.1.0" diff --git a/cognis_mil/audit.py b/cognis_mil/audit.py index b201e6d..38d62af 100644 --- a/cognis_mil/audit.py +++ b/cognis_mil/audit.py @@ -1,15 +1,20 @@ """Tamper-evident audit log. Hash-chained, append-only, local file.""" from __future__ import annotations -import hashlib, json, time + +import hashlib +import json +import time from pathlib import Path + class AuditLog: def __init__(self, path: Path): self.path = Path(path) self.path.parent.mkdir(parents=True, exist_ok=True) def _last_hash(self) -> str: - if not self.path.exists(): return "GENESIS" + if not self.path.exists(): + return "GENESIS" try: last = self.path.read_text().rstrip().split("\n")[-1] return json.loads(last)["hash"] @@ -30,17 +35,28 @@ def append(self, event: dict) -> dict: return entry def verify(self) -> tuple[bool, str]: - if not self.path.exists(): return True, "Empty log" + if not self.path.exists(): + return True, "Empty log" + lines = self.path.read_text().splitlines() + if not lines: + return True, "Empty log" prev = "GENESIS" - for i, line in enumerate(self.path.read_text().splitlines(), 1): + count = 0 + for i, line in enumerate(lines, 1): try: e = json.loads(line) - except: return False, f"Line {i}: not valid JSON" - recomputed_body = json.dumps({k:e[k] for k in ("ts","prev","event")}, sort_keys=True, default=str) + except json.JSONDecodeError: + return False, f"Line {i}: not valid JSON" + recomputed_body = json.dumps( + {k: e[k] for k in ("ts", "prev", "event")}, + sort_keys=True, + default=str, + ) recomputed = hashlib.sha256((recomputed_body + prev).encode()).hexdigest() if recomputed != e["hash"]: return False, f"Hash mismatch at line {i}" if e["prev"] != prev: return False, f"Prev mismatch at line {i}" prev = e["hash"] - return True, f"Chain OK ({i} entries)" + count = i + return True, f"Chain OK ({count} entries)" diff --git a/cognis_mil/classmark.py b/cognis_mil/classmark.py index c389256..93f546a 100644 --- a/cognis_mil/classmark.py +++ b/cognis_mil/classmark.py @@ -6,24 +6,29 @@ Reference: ODNI CAPCO Implementation Manual (unclassified, public). """ from __future__ import annotations + from dataclasses import dataclass, field VALID_LEVELS = ["UNCLASSIFIED", "CONFIDENTIAL", "SECRET", "TOP SECRET"] -VALID_FGI = ["FGI"] # Foreign Government Information marker placeholder +VALID_FGI = ["FGI"] # Foreign Government Information marker placeholder + @dataclass class ClassificationBanner: """Builds a CAPCO-shape banner. Validation of *form*, not content.""" - level: str = "UNCLASSIFIED" # operator-supplied - sci: list[str] = field(default_factory=list) # e.g. operator-supplied SCI compartments - sap: list[str] = field(default_factory=list) # SAP program IDs (operator-supplied) - dissem: list[str] = field(default_factory=list) # NOFORN/REL TO/ORCON etc. (operator-supplied) - nonic: list[str] = field(default_factory=list) # Non-IC dissem (FOUO/CUI etc.) + + level: str = "UNCLASSIFIED" # operator-supplied + sci: list[str] = field(default_factory=list) # operator-supplied SCI compartments + sap: list[str] = field(default_factory=list) # SAP program IDs (operator-supplied) + dissem: list[str] = field(default_factory=list) # NOFORN/REL TO/ORCON etc. + nonic: list[str] = field(default_factory=list) # Non-IC dissem (FOUO/CUI etc.) def validate(self) -> tuple[bool, list[str]]: errs = [] if self.level not in VALID_LEVELS: - errs.append(f"Invalid base level: {self.level}. Expected one of {VALID_LEVELS}.") + errs.append( + f"Invalid base level: {self.level}. Expected one of {VALID_LEVELS}." + ) # Higher levels with no markings is a smell, but not invalid if self.level == "UNCLASSIFIED" and (self.sci or self.sap): errs.append("UNCLASSIFIED cannot carry SCI/SAP compartments") @@ -32,13 +37,18 @@ def validate(self) -> tuple[bool, list[str]]: def render(self) -> str: """Render the banner-line string. Operator content is passed through.""" parts = [self.level] - if self.sci: parts.append("/".join(self.sci)) - if self.sap: parts.append("SAR-" + "/".join(self.sap)) + if self.sci: + parts.append("/".join(self.sci)) + if self.sap: + parts.append("SAR-" + "/".join(self.sap)) suffix = [] - if self.dissem: suffix.extend(self.dissem) - if self.nonic: suffix.extend(self.nonic) + if self.dissem: + suffix.extend(self.dissem) + if self.nonic: + suffix.extend(self.nonic) line = "//".join(parts) - if suffix: line += "//" + "/".join(suffix) + if suffix: + line += "//" + "/".join(suffix) return line @classmethod diff --git a/cognis_mil/cli.py b/cognis_mil/cli.py index b269180..f16acfa 100644 --- a/cognis_mil/cli.py +++ b/cognis_mil/cli.py @@ -1,32 +1,97 @@ -import argparse, sys +import argparse +import sys + from .exporters import to_console, to_json, to_markdown, to_sarif, to_oscal_skeleton + def make_cli(tool_name, scan_fn, version="0.1.0", extra_args=None): - p = argparse.ArgumentParser(prog=tool_name, description=f"{tool_name} — Cognis Digital · Military/IC ecosystem") + p = argparse.ArgumentParser( + prog=tool_name, + description=f"{tool_name} — Cognis Digital · Military/IC ecosystem", + ) p.add_argument("target", nargs="?", default=".", help="Path/target") - p.add_argument("--format", choices=["console","json","markdown","sarif","oscal"], default="console") + p.add_argument( + "--format", + choices=["console", "json", "markdown", "sarif", "oscal"], + default="console", + ) p.add_argument("--out", help="Write output to file") - p.add_argument("--fail-on", choices=["very_high","high","moderate","low","none"], default="none") - p.add_argument("--classification", default="UNCLASSIFIED//FOR PUBLIC RELEASE", - help="Operator-supplied banner. PLACEHOLDER. Tool does not interpret.") - p.add_argument("-v","--version", action="version", version=f"{tool_name} {version}") + p.add_argument( + "--fail-on", + choices=["very_high", "high", "moderate", "low", "none"], + default="none", + ) + p.add_argument( + "--classification", + default="UNCLASSIFIED//FOR PUBLIC RELEASE", + help="Operator-supplied banner. PLACEHOLDER. Tool does not interpret.", + ) + p.add_argument( + "-v", "--version", action="version", version=f"{tool_name} {version}" + ) if extra_args: - for a in extra_args: p.add_argument(*a["flags"], **{k:v for k,v in a.items() if k!="flags"}) + for a in extra_args: + p.add_argument( + *a["flags"], + **{k: v for k, v in a.items() if k != "flags"}, + ) args = p.parse_args() - result = scan_fn(args.target, **{k:v for k,v in vars(args).items() - if k not in {"target","format","out","fail_on","version","classification"}}) + + # Run the scan, converting expected errors into clean stderr messages. + try: + result = scan_fn( + args.target, + **{ + k: v + for k, v in vars(args).items() + if k not in { + "target", "format", "out", "fail_on", "version", "classification" + } + }, + ) + except Exception as exc: # noqa: BLE001 + print(f"{tool_name}: error: {exc}", file=sys.stderr) + sys.exit(2) + result.classification_placeholder = args.classification - if hasattr(result, "finalize") and not result.composite_score: result.finalize() - fmt = {"console":to_console,"json":to_json,"markdown":to_markdown,"sarif":to_sarif,"oscal":to_oscal_skeleton}[args.format] + if hasattr(result, "finalize") and not result.composite_score: + result.finalize() + + fmt = { + "console": to_console, + "json": to_json, + "markdown": to_markdown, + "sarif": to_sarif, + "oscal": to_oscal_skeleton, + }[args.format] out = fmt(result) + if args.out: - open(args.out,"w").write(out); print(f"Wrote {args.out}", file=sys.stderr) - else: print(out) + try: + with open(args.out, "w") as fh: + fh.write(out) + print(f"Wrote {args.out}", file=sys.stderr) + except OSError as exc: + print( + f"{tool_name}: cannot write to {args.out!r}: {exc}", + file=sys.stderr, + ) + sys.exit(2) + else: + print(out) + if args.fail_on != "none": from .models import Severity - thresh = {"very_high":[Severity.VERY_HIGH], - "high":[Severity.VERY_HIGH,Severity.HIGH], - "moderate":[Severity.VERY_HIGH,Severity.HIGH,Severity.MODERATE], - "low":[Severity.VERY_HIGH,Severity.HIGH,Severity.MODERATE,Severity.LOW]}[args.fail_on] - if any(f.severity in thresh for f in result.findings): sys.exit(1) + + thresh = { + "very_high": [Severity.VERY_HIGH], + "high": [Severity.VERY_HIGH, Severity.HIGH], + "moderate": [Severity.VERY_HIGH, Severity.HIGH, Severity.MODERATE], + "low": [ + Severity.VERY_HIGH, Severity.HIGH, Severity.MODERATE, Severity.LOW + ], + }[args.fail_on] + if any(f.severity in thresh for f in result.findings): + sys.exit(1) + sys.exit(0) diff --git a/cognis_mil/exporters.py b/cognis_mil/exporters.py index eab768b..247367f 100644 --- a/cognis_mil/exporters.py +++ b/cognis_mil/exporters.py @@ -1,85 +1,167 @@ import json + from .models import Severity, ScanResult -ICON = {Severity.VERY_HIGH:"🚨", Severity.HIGH:"❗", Severity.MODERATE:"⚠️ ", Severity.LOW:"•", Severity.VERY_LOW:"ℹ️ "} +ICON = { + Severity.VERY_HIGH: "\U0001f6a8", + Severity.HIGH: "❗", + Severity.MODERATE: "⚠️ ", + Severity.LOW: "•", + Severity.VERY_LOW: "ℹ️ ", +} + def to_json(r: ScanResult) -> str: return json.dumps(r.to_dict(), indent=2, default=str) + def to_console(r: ScanResult) -> str: lines = [ - "═" * 70, + "=" * 70, f" {r.classification_placeholder}", - "═" * 70, + "=" * 70, f" Tool: {r.tool_name} v{r.tool_version}", f" Items scanned: {r.items_scanned}", f" Composite risk: {r.composite_score}/100 ({r.risk_level})", f" Findings: {r.total_findings()}", - "─" * 70, + "-" * 70, ] for f in r.findings[:100]: - lines.append(f" {ICON[f.severity]} [{f.severity.value.upper():<10}] {f.id:<14} {f.title}") - if f.location: lines.append(f" 📍 {f.location}") - if f.nist_800_53: lines.append(f" 📋 NIST 800-53: {f.nist_800_53}") - if f.disa_stig: lines.append(f" 🛡 STIG: {f.disa_stig}") - if f.mitre_attack:lines.append(f" 🎯 ATT&CK: {f.mitre_attack}") - if f.remediation: lines.append(f" 💡 {f.remediation}") - lines.append("═" * 70) + sev = f.severity.value.upper() + lines.append( + f" {ICON[f.severity]} [{sev:<10}] {f.id:<14} {f.title}" + ) + if f.location: + lines.append(f" [loc] {f.location}") + if f.nist_800_53: + lines.append(f" [NIST 800-53] {f.nist_800_53}") + if f.disa_stig: + lines.append(f" [STIG] {f.disa_stig}") + if f.mitre_attack: + lines.append(f" [ATT&CK] {f.mitre_attack}") + if f.remediation: + lines.append(f" [fix] {f.remediation}") + lines.append("=" * 70) lines.append(f" {r.classification_placeholder}") - lines.append("═" * 70) + lines.append("=" * 70) return "\n".join(lines) + def to_markdown(r: ScanResult) -> str: out = [ f"# {r.tool_name} report", - f"", + "", f"> **{r.classification_placeholder}**", - f"", + "", f"- **Score:** {r.composite_score}/100 ({r.risk_level})", f"- **Items scanned:** {r.items_scanned}", f"- **Findings:** {r.total_findings()}", - f"", + "", "| Sev | ID | Title | NIST | STIG | ATT&CK |", "|-----|----|----|----|----|----|", ] for f in r.findings: - out.append(f"| {f.severity.value} | `{f.id}` | {f.title} | {f.nist_800_53} | {f.disa_stig} | {f.mitre_attack} |") - out.append(""); out.append(f"> **{r.classification_placeholder}**") + out.append( + f"| {f.severity.value} | `{f.id}` | {f.title}" + f" | {f.nist_800_53} | {f.disa_stig} | {f.mitre_attack} |" + ) + out.append("") + out.append(f"> **{r.classification_placeholder}**") return "\n".join(out) + def to_sarif(r: ScanResult) -> str: - sev_map = {Severity.VERY_HIGH:"error", Severity.HIGH:"error", Severity.MODERATE:"warning", Severity.LOW:"note", Severity.VERY_LOW:"note"} - return json.dumps({ - "version":"2.1.0", - "$schema":"https://json.schemastore.org/sarif-2.1.0.json", - "runs":[{ - "tool":{"driver":{"name":r.tool_name,"version":r.tool_version}}, - "properties":{"classification":r.classification_placeholder}, - "results":[{ - "ruleId":f.id, "level":sev_map[f.severity], - "message":{"text":f"{f.title} | {f.description}"}, - "locations":[{"physicalLocation":{"artifactLocation":{"uri":f.location.split(':')[0] if f.location else 'unknown'}}}], - "properties":{"nist":f.nist_800_53, "stig":f.disa_stig, "attack":f.mitre_attack, "cci":f.cci}, - } for f in r.findings], - }], - }, indent=2) + sev_map = { + Severity.VERY_HIGH: "error", + Severity.HIGH: "error", + Severity.MODERATE: "warning", + Severity.LOW: "note", + Severity.VERY_LOW: "note", + } + + def _uri(loc: str) -> str: + return loc.split(":")[0] if loc else "unknown" + + return json.dumps( + { + "version": "2.1.0", + "$schema": "https://json.schemastore.org/sarif-2.1.0.json", + "runs": [ + { + "tool": { + "driver": { + "name": r.tool_name, + "version": r.tool_version, + } + }, + "properties": { + "classification": r.classification_placeholder + }, + "results": [ + { + "ruleId": f.id, + "level": sev_map[f.severity], + "message": {"text": f"{f.title} | {f.description}"}, + "locations": [ + { + "physicalLocation": { + "artifactLocation": { + "uri": _uri(f.location) + } + } + } + ], + "properties": { + "nist": f.nist_800_53, + "stig": f.disa_stig, + "attack": f.mitre_attack, + "cci": f.cci, + }, + } + for f in r.findings + ], + } + ], + }, + indent=2, + ) + def to_oscal_skeleton(r: ScanResult) -> str: """Minimal OSCAL 1.1 Assessment Results skeleton — operator fills the rest.""" - return json.dumps({ - "assessment-results":{ - "uuid": "00000000-0000-0000-0000-000000000000", - "metadata":{"title":f"{r.tool_name} assessment", - "version":r.tool_version, - "oscal-version":"1.1.0", - "remarks":"PLACEHOLDER — operator must supply UUIDs, parties, system-security-plan link"}, - "results":[{ - "uuid":"00000000-0000-0000-0000-000000000001", - "findings":[{ - "uuid":f"finding-{i}", "title":f.title, - "description":f.description, - "related-controls":[{"control-id":f.nist_800_53}] if f.nist_800_53 else [], - } for i, f in enumerate(r.findings, 1)] - }] - } - }, indent=2) + remarks = ( + "PLACEHOLDER — operator must supply UUIDs, parties," + " system-security-plan link" + ) + return json.dumps( + { + "assessment-results": { + "uuid": "00000000-0000-0000-0000-000000000000", + "metadata": { + "title": f"{r.tool_name} assessment", + "version": r.tool_version, + "oscal-version": "1.1.0", + "remarks": remarks, + }, + "results": [ + { + "uuid": "00000000-0000-0000-0000-000000000001", + "findings": [ + { + "uuid": f"finding-{i}", + "title": f.title, + "description": f.description, + "related-controls": ( + [{"control-id": f.nist_800_53}] + if f.nist_800_53 + else [] + ), + } + for i, f in enumerate(r.findings, 1) + ], + } + ], + } + }, + indent=2, + ) diff --git a/cognis_mil/models.py b/cognis_mil/models.py index 1b3cd29..b89f523 100644 --- a/cognis_mil/models.py +++ b/cognis_mil/models.py @@ -66,13 +66,23 @@ def all_findings(self): return self.findings def total_findings(self): return len(self.findings) def finalize(self): - import math if not self.findings: - self.composite_score = 0.0; self.risk_level = "Very Low"; return self + self.composite_score = 0.0 + self.risk_level = "Very Low" + return self score = sum(f.weight for f in self.findings) * 1.5 self.composite_score = min(100.0, score) s = self.composite_score - self.risk_level = "Very High" if s >= 80 else "High" if s >= 60 else "Moderate" if s >= 40 else "Low" if s >= 20 else "Very Low" + if s >= 80: + self.risk_level = "Very High" + elif s >= 60: + self.risk_level = "High" + elif s >= 40: + self.risk_level = "Moderate" + elif s >= 20: + self.risk_level = "Low" + else: + self.risk_level = "Very Low" return self def to_dict(self): diff --git a/install.ps1 b/install.ps1 new file mode 100644 index 0000000..8cf09f1 --- /dev/null +++ b/install.ps1 @@ -0,0 +1,29 @@ +# Comprehensive installer for cognis-digital/stigsentry (Windows PowerShell). +# Tries: pipx -> uv -> pip (git+https) -> from source. +# stigsentry is source-available and not on PyPI; all paths install from GitHub. +$ErrorActionPreference = "Stop" +$Repo = "stigsentry" +$Url = "git+https://github.com/cognis-digital/stigsentry.git" +$Git = "https://github.com/cognis-digital/stigsentry.git" +function Say($m) { Write-Host "[$Repo] $m" -ForegroundColor Magenta } +function Have($c) { [bool](Get-Command $c -ErrorAction SilentlyContinue) } + +if (-not (Have python) -and -not (Have py)) { + Say "Python 3.9+ is required but was not found. Install Python first."; exit 1 +} +if (Have pipx) { + Say "Installing with pipx (isolated, recommended)..." + pipx install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: stigsentry"; exit 0 } +} +if (Have uv) { + Say "Installing with uv..." + uv tool install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: stigsentry"; exit 0 } +} +if (Have pip) { + Say "Installing with pip (user site)..." + pip install --user $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: stigsentry"; exit 0 } +} +Say "No packaging tool worked; falling back to a source clone." +$Tmp = Join-Path $env:TEMP "$Repo-src" +git clone --depth 1 $Git $Tmp +Say "Cloned to $Tmp - run: cd $Tmp; python -m pip install ." diff --git a/install.sh b/install.sh index 3eed433..e2d0573 100644 --- a/install.sh +++ b/install.sh @@ -1,10 +1,34 @@ -#!/usr/bin/env sh -# Universal installer for stigsentry. Prefers uv > pipx > pip; installs from the repo. -set -e -SRC="git+https://github.com/cognis-digital/stigsentry.git" -echo "Installing stigsentry ..." -if command -v uv >/dev/null 2>&1; then uv tool install "$SRC" -elif command -v pipx >/dev/null 2>&1; then pipx install "$SRC" -elif command -v python3 >/dev/null 2>&1; then python3 -m pip install --user "$SRC" -else echo "Need uv, pipx, or python3+pip"; exit 1; fi -echo "Done. Run: stigsentry --help" +#!/usr/bin/env sh +# Comprehensive installer for cognis-digital/stigsentry (Linux / macOS). +# Tries the best available method: pipx -> uv -> pip (git+https) -> from source. +# stigsentry is source-available and not on PyPI; all paths install from GitHub. +set -eu + +REPO="stigsentry" +URL="git+https://github.com/cognis-digital/stigsentry.git" +GITURL="https://github.com/cognis-digital/stigsentry.git" + +say() { printf '\033[1;35m[%s]\033[0m %s\n' "$REPO" "$1"; } +have() { command -v "$1" >/dev/null 2>&1; } + +if ! have python3 && ! have python; then + say "Python 3.9+ is required but was not found. Install Python first."; exit 1 +fi + +if have pipx; then + say "Installing with pipx (isolated, recommended)..." + pipx install "$URL" && { say "Done. Run: stigsentry"; exit 0; } +fi +if have uv; then + say "Installing with uv..." + uv tool install "$URL" && { say "Done. Run: stigsentry"; exit 0; } +fi +if have pip3 || have pip; then + PIP="$(command -v pip3 || command -v pip)" + say "Installing with pip (user site)..." + "$PIP" install --user "$URL" && { say "Done. Run: stigsentry"; exit 0; } +fi + +say "No packaging tool worked; falling back to a source clone." +TMP="$(mktemp -d)"; git clone --depth 1 "$GITURL" "$TMP/$REPO" +say "Cloned to $TMP/$REPO — run: cd $TMP/$REPO && python3 -m pip install ." diff --git a/layman.md b/layman.md new file mode 100644 index 0000000..dcbdea0 --- /dev/null +++ b/layman.md @@ -0,0 +1 @@ +stigsentry is a command-line tool for security and compliance teams who need to track and report on system vulnerabilities against government standards. It reads scan results from security tools like Wazuh or OpenSCAP and automatically maps each finding to the specific NIST 800-53 control and DISA STIG rule it violates. The output is a ready-to-import spreadsheet (POAM) for risk management systems like eMASS or Xacta, saving hours of manual cross-referencing. It is built for military, federal, and defense contractor teams who must maintain continuous ATO (Authority to Operate) packages. diff --git a/stigsentry/__main__.py b/stigsentry/__main__.py index 1227c45..6071637 100644 --- a/stigsentry/__main__.py +++ b/stigsentry/__main__.py @@ -1,3 +1,3 @@ -import sys -from stigsentry.cli import main -sys.exit(main()) +from stigsentry.cli import main + +main() diff --git a/stigsentry/cli.py b/stigsentry/cli.py index d4a8a94..a66a29b 100644 --- a/stigsentry/cli.py +++ b/stigsentry/cli.py @@ -1,5 +1,8 @@ from cognis_mil import make_cli -from .core import scan + from . import __version__ -def main(): make_cli("stigsentry", scan, version=__version__) -if __name__ == "__main__": main() +from .core import scan + + +def main(): + make_cli("stigsentry", scan, version=__version__) diff --git a/stigsentry/core.py b/stigsentry/core.py index eeacf50..839f731 100644 --- a/stigsentry/core.py +++ b/stigsentry/core.py @@ -4,70 +4,223 @@ SIEM-agnostic enrichment layer. """ from __future__ import annotations -import json, csv, time +import csv +import io +import json +import sys from pathlib import Path + from cognis_mil import ScanResult, Finding, Severity # Sample of the DISA STIG → NIST 800-53 crosswalk (public) # Real operators load DISA's full SCAP / XCCDF benchmarks. STIG_CONTROLS = { - "V-238211": {"title":"Admin account lockout < 3 attempts", "nist":"AC-7(a)", "sev": Severity.HIGH, "cci":"CCI-000044"}, - "V-238213": {"title":"SSH root login permitted", "nist":"AC-6(2)", "sev": Severity.HIGH, "cci":"CCI-000206"}, - "V-238219": {"title":"PIV/CAC not required", "nist":"IA-2(11)","sev": Severity.HIGH, "cci":"CCI-000765"}, - "V-238230": {"title":"Unsigned kernel modules", "nist":"SI-7", "sev": Severity.HIGH, "cci":"CCI-001749"}, - "V-238298": {"title":"FIPS 140 mode disabled", "nist":"SC-13", "sev": Severity.VERY_HIGH, "cci":"CCI-002450"}, - "V-238382": {"title":"World-writable file owned by root", "nist":"AC-3", "sev": Severity.HIGH, "cci":"CCI-000213"}, - "V-242414": {"title":"Kubernetes API not requiring auth", "nist":"IA-2", "sev": Severity.VERY_HIGH, "cci":"CCI-000764"}, - "V-242418": {"title":"Blank password permitted", "nist":"IA-5", "sev": Severity.VERY_HIGH, "cci":"CCI-000196"}, + "V-238211": { + "title": "Admin account lockout < 3 attempts", + "nist": "AC-7(a)", + "sev": Severity.HIGH, + "cci": "CCI-000044", + }, + "V-238213": { + "title": "SSH root login permitted", + "nist": "AC-6(2)", + "sev": Severity.HIGH, + "cci": "CCI-000206", + }, + "V-238219": { + "title": "PIV/CAC not required", + "nist": "IA-2(11)", + "sev": Severity.HIGH, + "cci": "CCI-000765", + }, + "V-238230": { + "title": "Unsigned kernel modules", + "nist": "SI-7", + "sev": Severity.HIGH, + "cci": "CCI-001749", + }, + "V-238298": { + "title": "FIPS 140 mode disabled", + "nist": "SC-13", + "sev": Severity.VERY_HIGH, + "cci": "CCI-002450", + }, + "V-238382": { + "title": "World-writable file owned by root", + "nist": "AC-3", + "sev": Severity.HIGH, + "cci": "CCI-000213", + }, + "V-242414": { + "title": "Kubernetes API not requiring auth", + "nist": "IA-2", + "sev": Severity.VERY_HIGH, + "cci": "CCI-000764", + }, + "V-242418": { + "title": "Blank password permitted", + "nist": "IA-5", + "sev": Severity.VERY_HIGH, + "cci": "CCI-000196", + }, } + +class StigsentryError(Exception): + """Raised for user-facing errors (bad input, missing files, parse failures).""" + + def parse_findings_file(path: Path) -> list[dict]: - """Accept JSON list of {stig_id, host, status} OR CSV with same cols.""" + """Accept JSON list of {stig_id, host, status} OR CSV/TSV with same cols. + + Raises StigsentryError with a clear message on parse failure. + Returns an empty list for an empty file. + """ + if not path.exists(): + raise StigsentryError(f"Input file not found: {path}") + + raw = path.read_text(encoding="utf-8", errors="replace").strip() + if not raw: + return [] + if path.suffix == ".json": - try: return json.loads(path.read_text()) - except: return [] + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + raise StigsentryError( + f"Invalid JSON in {path}: {exc}" + ) from exc + if not isinstance(data, list): + raise StigsentryError( + f"{path}: expected a JSON array of findings, got {type(data).__name__}" + ) + return data + if path.suffix in (".csv", ".tsv"): - with path.open() as f: - reader = csv.DictReader(f, delimiter="\t" if path.suffix == ".tsv" else ",") - return list(reader) + delimiter = "\t" if path.suffix == ".tsv" else "," + try: + reader = csv.DictReader(io.StringIO(raw), delimiter=delimiter) + rows = list(reader) + except csv.Error as exc: + raise StigsentryError(f"CSV parse error in {path}: {exc}") from exc + if reader.fieldnames is None: + return [] + required = {"stig_id", "status"} + missing = required - {f.strip() for f in (reader.fieldnames or [])} + if missing: + raise StigsentryError( + f"{path}: CSV is missing required columns: {sorted(missing)}" + ) + return rows + return [] + def scan(target=".", **opts): - r = ScanResult(tool_name="stigsentry", tool_version="0.1.0") + """Scan *target* (directory or single file) for STIG findings. + + Raises StigsentryError on unrecoverable input problems so callers can + display a clean message instead of a raw traceback. + """ p = Path(target) - files = list(p.glob("*.json")) + list(p.glob("*.csv")) if p.is_dir() else [p] + if not p.exists(): + raise StigsentryError(f"Target not found: {target}") + + r = ScanResult(tool_name="stigsentry", tool_version="0.1.0") + + if p.is_dir(): + files = list(p.glob("*.json")) + list(p.glob("*.csv")) + else: + files = [p] + r.items_scanned = len(files) + for f in files: - if not f.is_file(): continue - for finding in parse_findings_file(f): - sid = finding.get("stig_id") or finding.get("rule_id") or "" - status = (finding.get("status") or "").lower() - if status in ("pass","not_a_finding"): continue + if not f.is_file(): + continue + try: + raw_findings = parse_findings_file(f) + except StigsentryError as exc: + # Surface parse errors as LOW-severity informational findings so + # the scan still completes and the operator sees the problem. + r.add(Finding( + "SS-PARSE-ERR", + Severity.LOW, + str(exc), + location=str(f), + remediation="Fix the input file format and re-run.", + )) + continue + + for finding in raw_findings: + if not isinstance(finding, dict): + continue + sid = (finding.get("stig_id") or finding.get("rule_id") or "").strip() + status = (finding.get("status") or "").lower().strip() + if status in ("pass", "not_a_finding"): + continue + if not sid: + r.add(Finding( + "SS-NO-ID", + Severity.LOW, + "Finding row missing stig_id/rule_id", + location=str(f), + remediation="Populate stig_id or rule_id in the input file.", + )) + continue info = STIG_CONTROLS.get(sid) if not info: - r.add(Finding(f"SS-UNK-{sid}", Severity.LOW, - f"Unknown STIG ID: {sid}", location=str(f), - remediation="Add to STIG_CONTROLS table")) + r.add(Finding( + f"SS-UNK-{sid}", + Severity.LOW, + f"Unknown STIG ID: {sid}", + location=str(f), + remediation="Add to STIG_CONTROLS table", + )) continue - r.add(Finding(f"SS-{sid}", info["sev"], info["title"], - location=finding.get("host", str(f)), - nist_800_53=info["nist"], disa_stig=sid, cci=info["cci"], - remediation=f"Remediate per DISA {sid}, evidence to control {info['nist']}")) - r.finalize(); return r + r.add(Finding( + f"SS-{sid}", + info["sev"], + info["title"], + location=finding.get("host", str(f)), + nist_800_53=info["nist"], + disa_stig=sid, + cci=info["cci"], + remediation=( + f"Remediate per DISA {sid}, evidence to control {info['nist']}" + ), + )) + + r.finalize() + return r + def emit_poam(result: ScanResult, out: Path = None) -> str: """Emit eMASS-compatible POAM (Plan of Action & Milestones) CSV.""" - rows = [["Control","Weakness","Severity","SCD","POC","Status","Resources Required","Comments"]] + rows = [ + [ + "Control", "Weakness", "Severity", "SCD", "POC", + "Status", "Resources Required", "Comments", + ] + ] for f in result.findings: rows.append([ - f.nist_800_53 or "(none)", f.title, - f.severity.value, "TBD", "TBD", - "Open", "TBD", f"STIG {f.disa_stig} / CCI {f.cci}" + f.nist_800_53 or "(none)", + f.title, + f.severity.value, + "TBD", + "TBD", + "Open", + "TBD", + f"STIG {f.disa_stig} / CCI {f.cci}", ]) - import io buf = io.StringIO() writer = csv.writer(buf) writer.writerows(rows) text = buf.getvalue() - if out: out.write_text(text) + if out is not None: + try: + Path(out).write_text(text, encoding="utf-8") + except OSError as exc: + print(f"Warning: could not write POAM to {out}: {exc}", file=sys.stderr) return text diff --git a/tests/test_hardening.py b/tests/test_hardening.py new file mode 100644 index 0000000..fe66e2c --- /dev/null +++ b/tests/test_hardening.py @@ -0,0 +1,140 @@ +"""Tests covering hardened error-handling and edge-case paths.""" +from __future__ import annotations + +import json +import subprocess +import sys +import pytest + +from stigsentry.core import ( + StigsentryError, + emit_poam, + parse_findings_file, + scan, +) + + +# --------------------------------------------------------------------------- +# parse_findings_file +# --------------------------------------------------------------------------- + +def test_missing_file_raises(tmp_path): + """parse_findings_file raises StigsentryError for a non-existent file.""" + with pytest.raises(StigsentryError, match="not found"): + parse_findings_file(tmp_path / "ghost.json") + + +def test_malformed_json_raises(tmp_path): + """parse_findings_file raises StigsentryError for invalid JSON.""" + bad = tmp_path / "bad.json" + bad.write_text("{not valid json") + with pytest.raises(StigsentryError, match="Invalid JSON"): + parse_findings_file(bad) + + +def test_json_not_a_list_raises(tmp_path): + """parse_findings_file raises StigsentryError when JSON root is not a list.""" + obj = tmp_path / "obj.json" + obj.write_text(json.dumps({"stig_id": "V-238298", "status": "fail"})) + with pytest.raises(StigsentryError, match="expected a JSON array"): + parse_findings_file(obj) + + +def test_empty_json_file_returns_empty(tmp_path): + """Empty JSON file returns an empty list without error.""" + empty = tmp_path / "empty.json" + empty.write_text("") + assert parse_findings_file(empty) == [] + + +def test_empty_json_array_returns_empty(tmp_path): + """JSON file containing [] returns an empty list without error.""" + f = tmp_path / "zero.json" + f.write_text("[]") + assert parse_findings_file(f) == [] + + +def test_csv_missing_required_column_raises(tmp_path): + """CSV missing the 'status' column raises StigsentryError.""" + bad_csv = tmp_path / "bad.csv" + bad_csv.write_text("stig_id,host\nV-238298,web01\n") + with pytest.raises(StigsentryError, match="missing required columns"): + parse_findings_file(bad_csv) + + +# --------------------------------------------------------------------------- +# scan() +# --------------------------------------------------------------------------- + +def test_scan_nonexistent_target_raises(): + """scan() raises StigsentryError when the target path does not exist.""" + with pytest.raises(StigsentryError, match="not found"): + scan("/tmp/definitely_does_not_exist_xyzzy_12345") + + +def test_scan_empty_directory_returns_zero_findings(tmp_path): + """scan() on an empty directory produces zero findings without crashing.""" + result = scan(str(tmp_path)) + assert result.total_findings() == 0 + assert result.items_scanned == 0 + + +def test_scan_all_pass_returns_no_findings(tmp_path): + """Findings with status=pass are not included in results.""" + f = tmp_path / "all_pass.json" + f.write_text(json.dumps([ + {"stig_id": "V-238298", "host": "h1", "status": "pass"}, + {"stig_id": "V-238213", "host": "h1", "status": "not_a_finding"}, + ])) + result = scan(str(tmp_path)) + assert result.total_findings() == 0 + + +def test_scan_malformed_json_yields_parse_error_finding(tmp_path): + """A malformed JSON input file yields a parse-error finding rather than crashing.""" + bad = tmp_path / "corrupt.json" + bad.write_text("{bad") + result = scan(str(bad)) + ids = [f.id for f in result.findings] + assert any("PARSE-ERR" in fid for fid in ids) + + +def test_scan_finding_missing_stig_id(tmp_path): + """A finding row without stig_id/rule_id yields a SS-NO-ID finding.""" + f = tmp_path / "noid.json" + f.write_text(json.dumps([{"host": "h1", "status": "fail"}])) + result = scan(str(f)) + ids = [finding.id for finding in result.findings] + assert "SS-NO-ID" in ids + + +# --------------------------------------------------------------------------- +# emit_poam edge cases +# --------------------------------------------------------------------------- + +def test_emit_poam_empty_findings(): + """emit_poam on a result with no findings still produces a valid CSV header.""" + from cognis_mil import ScanResult + + r = ScanResult(tool_name="stigsentry") + r.finalize() + poam = emit_poam(r) + assert "Control,Weakness" in poam + # Only the header row — no data rows + lines = [ln for ln in poam.splitlines() if ln.strip()] + assert len(lines) == 1 + + +# --------------------------------------------------------------------------- +# CLI exit-code contract +# --------------------------------------------------------------------------- + +def test_cli_nonexistent_target_exits_2(): + """CLI exits with code 2 when the target path does not exist.""" + result = subprocess.run( + [sys.executable, "-m", "stigsentry", "/nonexistent/path/xyzzy"], + capture_output=True, + text=True, + ) + assert result.returncode == 2 + assert "error" in result.stderr.lower() diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 3634741..b4e92e1 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -12,7 +12,8 @@ def test_scan(): # passed ones not present assert "SS-V-238211" not in ids def test_poam_emit(tmp_path): - r = scan(str(D)); r.finalize() + r = scan(str(D)) + r.finalize() poam = emit_poam(r, tmp_path / "poam.csv") assert "Control,Weakness" in poam assert "SC-13" in poam