-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
163 lines (119 loc) · 5.25 KB
/
main.py
File metadata and controls
163 lines (119 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python3
"""Mental Health AI Pipeline - Main Orchestration Script.
This script orchestrates the complete data pipeline for therapeutic conversation
generation, from sourcing to deployment. Run from project root:
uv run python -m ai.main
"""
import logging
import ai.training.ready_packages.scripts.upload_to_s3 as s3_uploader
from ai.sourcing.academic import AcademicSourcingEngine
from ai.pipelines.orchestrator.alignment.dpo_generator import DPOGenerator
from ai.pipelines.orchestrator.orchestration.pipeline_runner import PipelineRunner
from ai.pipelines.orchestrator.processing.transcript_ingestor import TranscriptIngestor
from ai.pipelines.orchestrator.simulation.session_simulator import SessionSimulator
from ai.pipelines.orchestrator.sourcing.multi_source_ingestor import run_all_ingestors
from ai.pipelines.orchestrator.synthesis.dataset_synthesizer import DatasetSynthesizer
from ai.pipelines.orchestrator.therapies.act_integration import ACTIntegration
from ai.pipelines.orchestrator.therapies.cbt_integration import CBTIntegration
from ai.pipelines.orchestrator.therapies.crisis_expansion import CrisisScenarioExpander
from ai.pipelines.orchestrator.therapies.dbt_integration import DBTIntegration
from ai.pipelines.orchestrator.therapies.emdr_integration import EMDRIntegration
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("MentalHealthPipeline")
# --- Phase 0: External Acquisition (Direct S3 Stream) ---
def run_external_ingestion():
"""Run External Data Ingestion - streams directly to OVH S3."""
return run_all_ingestors(bucket="pixel-data", prefix="datasets/training_v3/")
# --- Phase 1: Internal Sourcing ---
def run_sourcing():
"""Run sourcing tasks (ArXiv)."""
engine = AcademicSourcingEngine()
return engine.run_sourcing_pipeline()
def run_voice_ingestion():
"""Run Voice Data Ingestion."""
ingestor = TranscriptIngestor()
output = ingestor.process_batch(batch_size=200)
return f"Ingested voice data to {output}" if output else "No voice data processed."
# --- Phase 2: Reasoning & Therapy Generation ---
def run_cbt():
cbt = CBTIntegration()
data = cbt.generate_batch_content(count=50) # Increased
return cbt.export_data(data)
def run_dbt():
dbt = DBTIntegration()
data = dbt.generate_batch_content(count=50) # Increased
return dbt.export_data(data)
def run_emdr():
emdr = EMDRIntegration()
data = emdr.generate_batch_content(count=50) # Increased
return emdr.export_data(data)
def run_act():
act = ACTIntegration()
data = act.generate_batch_content(count=50) # Increased
return act.export_data(data)
# --- Phase 3: Edge Cases ---
def run_crisis_expansion():
"""Run Crisis/Nightmare Fuel Generation."""
expander = CrisisScenarioExpander()
scenarios, output = expander.generate_batch(count=20) # Increased
return f"Generated {len(scenarios)} nightmare scenarios to {output}"
# --- Phase 4: Simulation ---
def run_simulation():
"""Run Session Simulation (including Journaling)."""
sim = SessionSimulator()
data = sim.generate_batch(count=20) # Increased
return f"Generated {len(data)} sessions."
# --- Phase 5: Synthesis ---
def run_synthesis():
"""Run Final Synthesis."""
synth = DatasetSynthesizer()
dataset = synth.synthesize_dataset(format_type="alpaca")
splits = synth.split_dataset(dataset)
results = []
for split, items in splits.items():
output = synth.output_path / f"final_{split}.jsonl"
with open(output, "w") as f:
import json
for item in items:
f.write(json.dumps(item) + "\n")
results.append(f"{split}: {len(items)}")
return ", ".join(results)
# --- Phase 6: Alignment ---
def run_dpo_generation():
"""Run DPO Pair Generation."""
dpo = DPOGenerator()
voice_file = (
"ai/training/ready_packages/datasets/stage4_voice/"
"processed_transcripts/voice_training_data_001.json"
)
data = dpo.process_voice_data(voice_file)
if data:
return dpo.export_dpo(data)
return "No DPO pairs generated (missing voice data?)"
# --- Phase 7: Deployment ---
def run_s3_upload():
"""Run S3 Upload."""
try:
s3_uploader.upload_final_artifacts()
return "Upload initiated (check logs)."
except Exception as e:
return f"Upload failed: {e}"
def main():
runner = PipelineRunner()
# Execute Pipeline in Order
runner.run_stage("External Dataset Ingestion (HF)", run_external_ingestion)
runner.run_stage("Academic Sourcing (ArXiv)", run_sourcing)
runner.run_stage("Voice Ingestion (Transcripts)", run_voice_ingestion)
runner.run_stage("CBT Generation (LLM-Augmented)", run_cbt)
runner.run_stage("DBT Generation", run_dbt)
runner.run_stage("EMDR Generation", run_emdr)
runner.run_stage("ACT Generation", run_act)
runner.run_stage("Crisis Expansion (Nightmare Fuel)", run_crisis_expansion)
runner.run_stage("Multi-Turn Simulation (Journaling)", run_simulation)
runner.run_stage("Final Synthesis (Instruction Tuning)", run_synthesis)
runner.run_stage("Alignment Data Generation (DPO)", run_dpo_generation)
runner.run_stage("S3 Upload", run_s3_upload)
runner.report()
if __name__ == "__main__":
main()