|
| 1 | +"""Diagnostic redaction CLI helpers.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import argparse |
| 6 | +import json |
| 7 | +import re |
| 8 | +from pathlib import Path |
| 9 | + |
| 10 | + |
| 11 | +def _redact(raw: str) -> tuple[str, dict[str, int]]: |
| 12 | + patterns: list[tuple[str, re.Pattern[str], str]] = [ |
| 13 | + ("cookies", re.compile(r"(?i)(\bcookie\s*[:=]\s*)([^\n]+)"), r"\1<redacted-cookie>"), |
| 14 | + ("bearer", re.compile(r"(?i)\bbearer\s+[a-z0-9._~+/=-]+"), "Bearer <redacted-token>"), |
| 15 | + ("oauth", re.compile(r'(?i)("?(?:access_token|refresh_token|oauth_token|id_token)"?\s*[:=]\s*)(".*?"|[^,\s]+)'), r'\1"<redacted-token>"'), |
| 16 | + ("api_keys", re.compile(r'(?i)("?(?:api[_-]?key|x-api-key|apikey|client_secret|authorization)"?\s*[:=]\s*)(".*?"|[^,\s]+)'), r'\1"<redacted-secret>"'), |
| 17 | + ("secrets", re.compile(r'(?i)("?(?:secret|password|token)"?\s*[:=]\s*)(".*?"|[^,\s]+)'), r'\1"<redacted-secret>"'), |
| 18 | + ("sensitive_ids", re.compile(r'(?i)("?(?:user|account|session|device|customer|tenant|workspace|organization|org|principal|subject)_id"?\s*[:=]\s*)(".*?"|[^,\s]+)'), r'\1"<redacted-id>"'), |
| 19 | + ("model_prompts", re.compile(r'(?i)("?(?:prompt|model_prompt|system_prompt|user_prompt)"?\s*[:=]\s*)(".*?"|[^,\n]+)'), r'\1"<redacted-prompt>"'), |
| 20 | + ("policy_marked", re.compile(r"(?is)<policy-marked>.*?</policy-marked>"), "<policy-marked><redacted-policy-snippet></policy-marked>"), |
| 21 | + ] |
| 22 | + counts: dict[str, int] = {} |
| 23 | + redacted = raw |
| 24 | + for name, pattern, replacement in patterns: |
| 25 | + redacted, count = pattern.subn(replacement, redacted) |
| 26 | + if count: |
| 27 | + counts[name] = count |
| 28 | + return redacted, counts |
| 29 | + |
| 30 | + |
| 31 | +def redact_cmd(args: argparse.Namespace) -> int: |
| 32 | + input_path = Path(args.input).expanduser().resolve() |
| 33 | + if not input_path.exists(): |
| 34 | + print(json.dumps({"type": "DiagnosticRedaction", "result": "fail", "errors": [f"missing input file: {input_path}"]}, indent=2, sort_keys=True)) |
| 35 | + return 1 |
| 36 | + raw = input_path.read_text(encoding="utf-8") |
| 37 | + redacted, counts = _redact(raw) |
| 38 | + |
| 39 | + if args.output: |
| 40 | + output_path = Path(args.output).expanduser().resolve() |
| 41 | + output_path.parent.mkdir(parents=True, exist_ok=True) |
| 42 | + output_path.write_text(redacted, encoding="utf-8") |
| 43 | + print(json.dumps({"type": "DiagnosticRedaction", "result": "pass", "input": str(input_path), "output": str(output_path), "redactionCounts": counts}, indent=2, sort_keys=True)) |
| 44 | + else: |
| 45 | + print(redacted) |
| 46 | + return 0 |
| 47 | + |
| 48 | + |
| 49 | +def build_parser() -> argparse.ArgumentParser: |
| 50 | + parser = argparse.ArgumentParser(prog="sourceosctl diagnostics", description="Diagnostic helpers") |
| 51 | + sub = parser.add_subparsers(dest="diagnostics_command", required=True) |
| 52 | + redact_p = sub.add_parser("redact", help="Redact sensitive tokens and snippets from diagnostic exports") |
| 53 | + redact_p.add_argument("input") |
| 54 | + redact_p.add_argument("--output", default=None) |
| 55 | + redact_p.set_defaults(func=redact_cmd) |
| 56 | + return parser |
| 57 | + |
| 58 | + |
| 59 | +def diagnostics_main(argv: list[str] | None = None) -> int: |
| 60 | + parser = build_parser() |
| 61 | + args = parser.parse_args(argv) |
| 62 | + return args.func(args) or 0 |
0 commit comments