Skip to content

Commit b070740

Browse files
committed
feat: wire PIX-32 normalize scripts and bump ai submodule
1 parent be2fc35 commit b070740

3 files changed

Lines changed: 256 additions & 32 deletions

File tree

ai

Submodule ai updated 38 files

scripts/data/run_normalize.py

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,93 @@
11
#!/usr/bin/env python3
2+
"""
3+
Legacy normalize script — updated to use PIX-32 pipeline.
4+
5+
Previously imported non-existent modules (scripts.ingestion.connectors.pubmed,
6+
scripts.ingestion.normalize.schema). Now delegates to the PIX-32
7+
NormalizationPipeline for all normalization work.
8+
9+
Usage:
10+
uv run python scripts/data/run_normalize.py --input data/raw/*.jsonl
11+
uv run python scripts/data/run_normalize.py --input data/raw/ --output data/normalized/output.jsonl
12+
"""
13+
214
from __future__ import annotations
3-
import json
15+
416
import sys
517
from pathlib import Path
618

719
ROOT = Path(__file__).resolve().parents[2]
820
if str(ROOT) not in sys.path:
921
sys.path.insert(0, str(ROOT))
1022

11-
from scripts.ingestion.connectors.pubmed import PubMedConnector
12-
from scripts.ingestion.normalize.schema import normalize_pubmed
13-
14-
15-
REQUIRED_FIELDS = ['id','source','text','topics','license','provenance']
16-
17-
def _schema_check(obj: dict) -> None:
18-
missing = [k for k in REQUIRED_FIELDS if k not in obj]
19-
if missing:
20-
raise ValueError(f"Normalized record missing fields: {missing}")
23+
from ai.core.pipelines.processing.normalization_pipeline import (
24+
DedupStrategy,
25+
NormalizationPipeline,
26+
)
2127

2228

2329
def main(args: list[str] | None = None) -> int:
2430
import argparse
25-
parser = argparse.ArgumentParser()
26-
parser.add_argument('--source', default='pubmed', choices=['pubmed'])
27-
parser.add_argument('--out', default=str(ROOT / 'tmp_rovodev_normalized.jsonl'))
31+
32+
parser = argparse.ArgumentParser(
33+
description="Normalize ingested JSONL data (PIX-32 pipeline).",
34+
)
35+
parser.add_argument(
36+
"--input",
37+
"-i",
38+
nargs="+",
39+
required=True,
40+
help="Input JSONL file(s) or directory path(s).",
41+
)
42+
parser.add_argument(
43+
"--output",
44+
"-o",
45+
default=None,
46+
help="Output JSONL file path (default: output_normalized.jsonl).",
47+
)
48+
parser.add_argument(
49+
"--reject-path",
50+
"-r",
51+
default=None,
52+
help="Rejected records output path (default: output_rejected.jsonl).",
53+
)
54+
parser.add_argument(
55+
"--dedup",
56+
"-d",
57+
choices=["none", "bloom", "similarity", "stage_aware"],
58+
default="similarity",
59+
help="Deduplication strategy (default: similarity).",
60+
)
61+
parser.add_argument(
62+
"--verbose",
63+
"-v",
64+
action="store_true",
65+
help="Enable debug logging.",
66+
)
67+
2868
ns = parser.parse_args(args or [])
2969

30-
out_path = Path(ns.out)
31-
count = 0
32-
33-
with out_path.open('w', encoding='utf-8') as out:
34-
if ns.source == 'pubmed':
35-
conn = PubMedConnector()
36-
normalizer = normalize_pubmed
37-
else:
38-
raise SystemExit(f"Unsupported source: {ns.source}")
39-
for rec in conn.fetch():
40-
norm = normalizer(rec.data)
41-
norm['id'] = f"{ns.source}-{count+1}"
42-
_schema_check(norm)
43-
out.write(json.dumps(norm, ensure_ascii=False) + "\n")
44-
count += 1
45-
print(f"Wrote {count} records to {out_path}")
70+
if ns.verbose:
71+
import logging
72+
73+
logging.basicConfig(level=logging.DEBUG)
74+
75+
pipeline = NormalizationPipeline(
76+
dedup_strategy=DedupStrategy(ns.dedup),
77+
)
78+
79+
result = pipeline.run(
80+
input_paths=ns.input,
81+
output_path=ns.output,
82+
reject_path=ns.reject_path,
83+
)
84+
85+
print(result.summary())
86+
87+
if result.final_records == 0 and result.total_records > 0:
88+
return 2
4689
return 0
4790

48-
if __name__ == '__main__':
91+
92+
if __name__ == "__main__":
4993
raise SystemExit(main())
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
#!/usr/bin/env python3
2+
"""
3+
PIX-32: Normalize and Deduplicate Ingested Data — CLI Entry Point.
4+
5+
Processes JSONL files through the full PIX-32 pipeline:
6+
1. Schema validation (PIX-30 canonical JSONL schema)
7+
2. Text normalization (unicode NFKC, whitespace cleanup)
8+
3. Key standardization (lower_snake_case)
9+
4. Deduplication (BloomFilter, similarity, or stage-aware)
10+
5. Provenance metadata attachment
11+
6. Output to normalized JSONL with rejection report
12+
13+
Usage:
14+
PIX32_FILES=data/raw/*.jsonl uv run python scripts/data/run_pix32_normalize_dedup.py
15+
PIX32_FILES=data/raw/ uv run python scripts/data/run_pix32_normalize_dedup.py --output data/normalized/output.jsonl
16+
PIX32_FILES=data/raw/ uv run python scripts/data/run_pix32_normalize_dedup.py --dedup stage_aware --enforce-license
17+
"""
18+
19+
from __future__ import annotations
20+
21+
import json
22+
import logging
23+
import os
24+
import sys
25+
from pathlib import Path
26+
27+
# Ensure project root is on sys.path for ai.core imports
28+
ROOT = Path(__file__).resolve().parents[2]
29+
if str(ROOT) not in sys.path:
30+
sys.path.insert(0, str(ROOT))
31+
32+
from ai.core.pipelines.processing.normalization_pipeline import (
33+
DedupStrategy,
34+
NormalizationPipeline,
35+
)
36+
37+
logging.basicConfig(
38+
level=logging.INFO,
39+
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
40+
datefmt="%Y-%m-%dT%H:%M:%S",
41+
)
42+
logger = logging.getLogger("pix32")
43+
44+
45+
def _progress_callback(current: int, total: int) -> None:
46+
"""Simple progress reporter."""
47+
pct = (current / total * 100) if total > 0 else 0
48+
logger.info("Progress: %d/%d (%.1f%%)", current, total, pct)
49+
50+
51+
def main(args: list[str] | None = None) -> int:
52+
import argparse
53+
54+
parser = argparse.ArgumentParser(
55+
description="PIX-32: Normalize and deduplicate ingested JSONL data.",
56+
formatter_class=argparse.RawDescriptionHelpFormatter,
57+
epilog=__doc__,
58+
)
59+
60+
parser.add_argument(
61+
"--files",
62+
nargs="*",
63+
default=None,
64+
help="Input JSONL file(s) or directory path(s). Supports glob patterns. "
65+
"Can also be set via PIX32_FILES env var (colon-separated).",
66+
)
67+
parser.add_argument(
68+
"--output",
69+
"-o",
70+
default=None,
71+
help="Output JSONL file path (default: output_normalized.jsonl).",
72+
)
73+
parser.add_argument(
74+
"--reject-path",
75+
"-r",
76+
default=None,
77+
help="Rejected records JSONL file path (default: output_rejected.jsonl).",
78+
)
79+
parser.add_argument(
80+
"--dedup",
81+
"-d",
82+
choices=["none", "bloom", "similarity", "stage_aware"],
83+
default="similarity",
84+
help="Deduplication strategy (default: similarity).",
85+
)
86+
parser.add_argument(
87+
"--similarity-threshold",
88+
type=float,
89+
default=0.85,
90+
help="Similarity threshold for dedup (default: 0.85).",
91+
)
92+
parser.add_argument(
93+
"--enforce-license",
94+
action="store_true",
95+
help="Reject records without a license field.",
96+
)
97+
parser.add_argument(
98+
"--enforce-phi-scan",
99+
action="store_true",
100+
help="Reject records without phi_scan_passed field.",
101+
)
102+
parser.add_argument(
103+
"--verbose",
104+
"-v",
105+
action="store_true",
106+
help="Enable debug logging.",
107+
)
108+
109+
ns = parser.parse_args(args or [])
110+
111+
# Resolve input files: --files arg > PIX32_FILES env var
112+
input_files = ns.files
113+
if not input_files:
114+
env_files = os.environ.get("PIX32_FILES", "")
115+
if env_files:
116+
input_files = env_files.split(":")
117+
else:
118+
parser.error(
119+
"No input files specified. Use --files or set PIX32_FILES env var "
120+
"(colon-separated paths)."
121+
)
122+
123+
if ns.verbose:
124+
logging.getLogger().setLevel(logging.DEBUG)
125+
126+
strategy = DedupStrategy(ns.dedup)
127+
128+
logger.info("PIX-32 Pipeline starting")
129+
logger.info(" Input: %s", input_files)
130+
logger.info(" Output: %s", ns.output or "output_normalized.jsonl")
131+
logger.info(" Reject path: %s", ns.reject_path or "output_rejected.jsonl")
132+
logger.info(" Dedup: %s", ns.dedup)
133+
logger.info(" Similarity: %.2f", ns.similarity_threshold)
134+
logger.info(" Enforce license: %s", ns.enforce_license)
135+
logger.info(" Enforce PHI: %s", ns.enforce_phi_scan)
136+
137+
pipeline = NormalizationPipeline(
138+
dedup_strategy=strategy,
139+
similarity_threshold=ns.similarity_threshold,
140+
enforce_license=ns.enforce_license,
141+
enforce_phi_scan=ns.enforce_phi_scan,
142+
on_progress=_progress_callback,
143+
)
144+
145+
try:
146+
result = pipeline.run(
147+
input_paths=list(input_files),
148+
output_path=ns.output,
149+
reject_path=ns.reject_path,
150+
)
151+
except Exception as exc:
152+
logger.error("Pipeline failed: %s", exc, exc_info=True)
153+
return 1
154+
155+
# Print summary
156+
print()
157+
print(result.summary())
158+
print()
159+
160+
# Exit with error if no valid records produced
161+
if result.final_records == 0 and result.total_records > 0:
162+
logger.error("No valid records produced after normalization and deduplication")
163+
return 2
164+
165+
# Exit with warning if rejection rate is high
166+
if result.total_records > 0:
167+
rejection_rate = result.rejected_records / result.total_records
168+
if rejection_rate > 0.5:
169+
logger.warning(
170+
"High rejection rate: %.1f%% (%d/%d records rejected)",
171+
rejection_rate * 100,
172+
result.rejected_records,
173+
result.total_records,
174+
)
175+
176+
return 0
177+
178+
179+
if __name__ == "__main__":
180+
raise SystemExit(main())

0 commit comments

Comments
 (0)