-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprep.py
More file actions
executable file
·1521 lines (1313 loc) · 60.2 KB
/
prep.py
File metadata and controls
executable file
·1521 lines (1313 loc) · 60.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
prep.py - Interview Prep Content Pipeline
==========================================
Automates: Syllabus -> Content (per episode) -> Package (Gem + NotebookLM)
Usage:
python prep.py init <profile-name> # Create new profile skeleton
python prep.py setup --profile P # Generate domain files (3 API calls)
python prep.py all --profile P # Full pipeline
python prep.py syllabus --profile P # Generate agendas only
python prep.py content --profile P [--episode N] # Generate content
python prep.py add <file> --profile P [--gem-slot N] # Distill doc -> content -> package
python prep.py package [--profile P] # Repackage outputs
python prep.py render <file> [--profile P] # Substitute env vars, print to stdout
python prep.py status [--profile P] # Show what exists
Setup:
pip install -r requirements.txt
cp .env.example .env # edit .env with your API key
set -a && source .env && set +a
python prep.py setup --profile <name>
"""
import argparse
import math
import os
import re
import string
import sys
import time
from pathlib import Path
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
BASE_DIR = Path(__file__).parent
PROMPTS = BASE_DIR / "prompts"
INPUTS = BASE_DIR / "inputs"
OUTPUTS = BASE_DIR / "outputs"
SYLLABUS_DIR = OUTPUTS / "syllabus"
EPISODES_DIR = OUTPUTS / "episodes"
GEM_DIR = OUTPUTS / "gem"
NLM_DIR = OUTPUTS / "notebooklm"
RAW_DIR = OUTPUTS / "raw"
IN_AGENDAS = INPUTS / "agendas"
IN_EPISODES = INPUTS / "episodes"
IN_MISC = INPUTS / "misc"
MODEL = os.environ.get("OPENAI_MODEL", "gpt-5.2-pro")
AS_OF = os.environ.get("AS_OF_DATE", "Feb 2026")
# Role config — set in .env to customize for your target role
# NOTE: these must be defined before SYLLABUS_INSTRUCTIONS etc. (which use them)
ROLE = os.environ.get("PREP_ROLE", "Staff Engineer")
COMPANY = os.environ.get("PREP_COMPANY", "a top tech company")
DOMAIN = os.environ.get("PREP_DOMAIN", "Security & Infrastructure")
AUDIENCE = os.environ.get("PREP_AUDIENCE", "Senior Software Engineers")
_CORE_COUNT = int(os.environ.get("PREP_CORE_EPISODES", "12"))
_FRONTIER_COUNT = int(os.environ.get("PREP_FRONTIER_EPISODES", "3"))
CORE_EPS = list(range(1, _CORE_COUNT + 1))
FRONTIER_EPS = list(range(_CORE_COUNT + 1, _CORE_COUNT + _FRONTIER_COUNT + 1))
ALL_EPS = CORE_EPS + FRONTIER_EPS
def frontier_map(core_count=None, frontier_count=None):
"""Map frontier letters to episode numbers. e.g. {"A": 13, "B": 14, "C": 15}."""
if core_count is None: core_count = _CORE_COUNT
if frontier_count is None: frontier_count = _FRONTIER_COUNT
return {chr(65 + i): core_count + i + 1 for i in range(frontier_count)}
def gem_slot(ep, core_count=None, frontier_eps=None):
"""Return the gem file slot number for an episode."""
if core_count is None: core_count = _CORE_COUNT
if frontier_eps is None: frontier_eps = FRONTIER_EPS
core_slots = math.ceil(core_count / 2)
if 1 <= ep <= core_count:
return (ep - 1) // 2 + 1
if ep in frontier_eps:
return core_slots + 1
return core_slots + (2 if frontier_eps else 1)
def _total_gem_slots(core_count=None, frontier_count=None):
"""Total number of gem slots: ceil(core/2) + (1 if frontiers) + 1 misc."""
if core_count is None: core_count = _CORE_COUNT
if frontier_count is None: frontier_count = _FRONTIER_COUNT
return math.ceil(core_count / 2) + (1 if frontier_count > 0 else 0) + 1
def build_syllabus_runs(core_count, frontier_count, batch_size=4):
"""Build the SYLLABUS_RUNS list dynamically from episode counts."""
runs = [dict(mode="SCAFFOLD", core="", frontier="")]
num_batches = math.ceil(core_count / batch_size) if core_count > 0 else 0
letters = list(string.ascii_uppercase[:frontier_count])
for b in range(num_batches):
s = b * batch_size + 1
e = min((b + 1) * batch_size, core_count)
core_str = str(s) if s == e else f"{s}-{e}"
runs.append(dict(mode="CORE_BATCH", core=core_str, frontier=""))
if b < len(letters):
runs.append(dict(mode="FRONTIER_DIGEST", core="", frontier=letters[b]))
for extra in letters[num_batches:]:
runs.append(dict(mode="FRONTIER_DIGEST", core="", frontier=extra))
runs.append(dict(mode="FINAL_MERGE", core="", frontier=""))
return runs
SYLLABUS_RUNS = build_syllabus_runs(_CORE_COUNT, _FRONTIER_COUNT)
def _reconfigure(core_count=12, frontier_count=3):
"""Regenerate all derived state from counts. Used by tests and profile loading."""
global _CORE_COUNT, _FRONTIER_COUNT, CORE_EPS, FRONTIER_EPS, ALL_EPS, SYLLABUS_RUNS
_CORE_COUNT = core_count
_FRONTIER_COUNT = frontier_count
CORE_EPS = list(range(1, core_count + 1))
FRONTIER_EPS = list(range(core_count + 1, core_count + frontier_count + 1))
ALL_EPS = CORE_EPS + FRONTIER_EPS
SYLLABUS_RUNS = build_syllabus_runs(core_count, frontier_count)
def _frontier_range_str():
"""e.g. '13-15'"""
if not FRONTIER_EPS:
return "(none)"
return f"{FRONTIER_EPS[0]}-{FRONTIER_EPS[-1]}" if len(FRONTIER_EPS) > 1 else str(FRONTIER_EPS[0])
def _frontier_map_str():
"""e.g. ' - Digest A = Episode 13 (covers core Episodes 1-4)\n ...'"""
fm = frontier_map()
lines = []
batch_size = 4
for letter, ep_num in sorted(fm.items(), key=lambda x: x[1]):
idx = ord(letter) - ord('A')
start = idx * batch_size + 1
end = min((idx + 1) * batch_size, _CORE_COUNT)
lines.append(f" - Digest {letter} = Episode {ep_num} (covers core Episodes {start}-{end})")
return "\n".join(lines) if lines else " (no frontier digests)"
def _listening_order_str():
"""e.g. 'Episodes 1-4 -> Episode 13 (Frontier Digest A) -> Episodes 5-8 -> ...'"""
fm = frontier_map()
parts = []
batch_size = 4
for i in range(0, _CORE_COUNT, batch_size):
start = i + 1
end = min(i + batch_size, _CORE_COUNT)
parts.append(f"Episodes {start}-{end}")
letter = chr(ord('A') + i // batch_size)
if letter in fm:
ep = fm[letter]
parts.append(f"Episode {ep} (Frontier Digest {letter})")
return " -> ".join(parts)
# ---------------------------------------------------------------------------
# DOMAIN CONTENT (domain-specific prompt injection)
# ---------------------------------------------------------------------------
_DOMAIN = {} # marker -> content, populated by set_profile()
# Stub comment prefix — files starting with this are considered empty stubs
_STUB_PREFIX = "<!-- STUB:"
# Domain file names — used by init, setup, preflight, and status
_DOMAIN_FILES = ["seeds.md", "coverage.md", "lenses.md", "gem-sections.md"]
def _parse_domain_sections(text):
"""Parse <!-- MARKER --> delimited sections from domain file content."""
result = {}
current_marker = None
current_lines = []
for line in text.split("\n"):
stripped = line.strip()
if re.match(r"^<!--\s+(\w+)\s+-->$", stripped):
if current_marker:
result[current_marker] = "\n".join(current_lines).strip()
current_marker = re.match(r"^<!--\s+(\w+)\s+-->$", stripped).group(1)
current_lines = []
elif current_marker is not None:
current_lines.append(line)
if current_marker:
result[current_marker] = "\n".join(current_lines).strip()
return result
def _load_domain(profile_name):
"""Load domain files from profiles/{name}/domain/. Returns dict of marker->content."""
domain_dir = BASE_DIR / "profiles" / profile_name / "domain"
result = {}
if not domain_dir.is_dir():
return result
for f in sorted(domain_dir.iterdir()):
if f.suffix == ".md":
text = f.read_text(encoding="utf-8")
sections = _parse_domain_sections(text)
if not sections:
print(f" WARNING: {f.name} has no <!-- MARKER --> sections")
result.update(sections)
return result
def _inject_domain(text, domain=None):
"""Replace {MARKER} placeholders with domain content."""
if domain is None:
domain = _DOMAIN
for marker, content in domain.items():
text = text.replace("{" + marker + "}", content)
return text
def _is_stub(filepath):
"""Check if a domain file is a stub (starts with STUB comment)."""
if not filepath.exists():
return True
text = filepath.read_text(encoding="utf-8").strip()
return not text or text.startswith(_STUB_PREFIX)
def _needs_setup(profile_name):
"""Return True if any domain file is a stub (setup not yet run)."""
domain_dir = BASE_DIR / "profiles" / profile_name / "domain"
return any(_is_stub(domain_dir / f) for f in _DOMAIN_FILES)
def _preflight_check(profile_name, command):
"""Validate profile completeness before API calls. Errors early to avoid wasted spend."""
profile_dir = BASE_DIR / "profiles" / profile_name
domain_dir = profile_dir / "domain"
# setup creates domain files; all auto-runs setup if needed
if command in ("setup", "all"):
return
# 1. Domain files exist and are non-stub
for name in _DOMAIN_FILES:
f = domain_dir / name
if _is_stub(f):
print(f"ERROR: domain/{name} is empty or missing.")
print(f" Run 'python prep.py setup --profile {profile_name}' or see prompts/intake.md")
sys.exit(1)
# 2. Prompt files exist
for prompt_name in ["syllabus", "content", "distill"]:
p = PROMPTS / f"{prompt_name}.md"
if not p.exists():
print(f"ERROR: {p} not found")
sys.exit(1)
# ---------------------------------------------------------------------------
# PROFILES
# ---------------------------------------------------------------------------
_PROFILE_KNOWN_FIELDS = {
"role", "company", "domain", "audience",
"core_episodes", "frontier_episodes",
"model", "effort", "as_of",
}
_PROFILE_REQUIRED_FIELDS = {"role", "company", "domain"}
_PROFILE_INT_FIELDS = {"core_episodes", "frontier_episodes"}
def load_profile(name):
"""Parse profiles/{name}/profile.md YAML frontmatter. Returns config dict."""
profile_dir = BASE_DIR / "profiles" / name
if not profile_dir.is_dir():
print(f"ERROR: profile '{name}' not found at {profile_dir}/")
sys.exit(1)
profile_path = profile_dir / "profile.md"
if not profile_path.exists():
print(f"ERROR: {profile_path} not found. Run 'python prep.py init {name}' first.")
sys.exit(1)
text = profile_path.read_text(encoding="utf-8")
lines = text.split("\n")
# Find frontmatter delimiters
delimiters = [i for i, line in enumerate(lines) if line.strip() == "---"]
if len(delimiters) < 2:
print(f"ERROR: {profile_path} has no YAML frontmatter (expected --- delimiters)")
sys.exit(1)
config = {}
for line in lines[delimiters[0] + 1 : delimiters[1]]:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if ":" not in stripped:
continue
key, _, value = stripped.partition(":")
key = key.strip().lower()
value = value.strip().strip('"').strip("'")
if not value:
if key in _PROFILE_REQUIRED_FIELDS:
print(f"ERROR: {profile_path.name} field '{key}' is blank — add a value.")
sys.exit(1)
continue
if key not in _PROFILE_KNOWN_FIELDS:
print(f"WARNING: unknown field '{key}' in {profile_path.name}")
config[key] = value
# Validate required fields
for field in _PROFILE_REQUIRED_FIELDS:
if field not in config:
print(f"ERROR: {profile_path.name} missing required field '{field}'.")
sys.exit(1)
# Validate integer fields
_NONNEG_INT_FIELDS = {"frontier_episodes"} # 0 is valid
for field in _PROFILE_INT_FIELDS:
if field in config:
try:
val = int(config[field])
min_val = 0 if field in _NONNEG_INT_FIELDS else 1
if val < min_val:
raise ValueError()
config[field] = val
except (ValueError, TypeError):
label = "non-negative" if field in _NONNEG_INT_FIELDS else "positive"
print(f"ERROR: {field} must be a {label} integer, got '{config[field]}'.")
sys.exit(1)
return config
def set_profile(name):
"""Load a profile and redirect all directory constants + config vars."""
global OUTPUTS, SYLLABUS_DIR, EPISODES_DIR, GEM_DIR, NLM_DIR, RAW_DIR
global IN_AGENDAS, IN_EPISODES, IN_MISC
global ROLE, COMPANY, DOMAIN, AUDIENCE, MODEL, EFFORT, AS_OF
global _DOMAIN
config = load_profile(name)
# Load domain content for prompt injection
_DOMAIN = _load_domain(name)
# Redirect directories to profile paths (PROMPTS stays shared)
profile_dir = BASE_DIR / "profiles" / name
OUTPUTS = profile_dir / "outputs"
SYLLABUS_DIR = OUTPUTS / "syllabus"
EPISODES_DIR = OUTPUTS / "episodes"
GEM_DIR = OUTPUTS / "gem"
NLM_DIR = OUTPUTS / "notebooklm"
RAW_DIR = OUTPUTS / "raw"
IN_AGENDAS = profile_dir / "inputs" / "agendas"
IN_EPISODES = profile_dir / "inputs" / "episodes"
IN_MISC = profile_dir / "inputs" / "misc"
# Update config vars from profile (fallback to current values)
ROLE = config.get("role", ROLE)
COMPANY = config.get("company", COMPANY)
DOMAIN = config.get("domain", DOMAIN)
AUDIENCE = config.get("audience", AUDIENCE)
MODEL = config.get("model", MODEL)
EFFORT = config.get("effort", EFFORT)
AS_OF = config.get("as_of", AS_OF)
# Reconfigure episode counts if profile overrides them
core = config.get("core_episodes")
frontier = config.get("frontier_episodes")
if core is not None or frontier is not None:
_reconfigure(
core if core is not None else _CORE_COUNT,
frontier if frontier is not None else _FRONTIER_COUNT,
)
return config
# ---------------------------------------------------------------------------
# OPENAI CLIENT
# ---------------------------------------------------------------------------
EFFORT = os.environ.get("OPENAI_EFFORT", "xhigh") # xhigh | high | medium | low
VERBOSITY = os.environ.get("OPENAI_VERBOSITY", "") # "" = auto-detect from model
MAX_OUTPUT = int(os.environ.get("OPENAI_MAX_TOKENS", "16000"))
# Model family capabilities: (supports_reasoning, default_verbosity, allowed_efforts)
_MODEL_CAPS = {
"gpt-5.2-pro": (True, "high", {"medium", "high", "xhigh"}),
"gpt-5.2": (True, "high", {"none", "low", "medium", "high", "xhigh"}),
"o3": (True, "medium", {"low", "medium", "high"}),
"o4-mini": (True, "medium", {"low", "medium", "high"}),
"o4": (True, "medium", {"low", "medium", "high"}),
"gpt-4.1": (False, None, None),
"gpt-4o-mini": (False, None, None),
"gpt-4o": (False, None, None),
}
_EFFORT_SCALE = ["none", "low", "medium", "high", "xhigh"]
def _clamp_effort(effort, allowed):
"""Clamp effort to nearest valid level. Returns (value, was_clamped)."""
if allowed is None or effort in allowed:
return effort, False
idx = _EFFORT_SCALE.index(effort) if effort in _EFFORT_SCALE else 2
# Search outward: up first (higher effort is safer than lower)
for dist in range(1, len(_EFFORT_SCALE)):
for candidate_idx in [idx + dist, idx - dist]:
if 0 <= candidate_idx < len(_EFFORT_SCALE):
candidate = _EFFORT_SCALE[candidate_idx]
if candidate in allowed:
return candidate, True
return effort, False # shouldn't happen
def _model_capabilities(model):
"""Build optional kwargs for responses.create() based on model name."""
# Match longest prefix
supports_reasoning, default_verbosity, allowed_efforts = True, None, None
best_len = 0
for prefix, caps in _MODEL_CAPS.items():
if model.startswith(prefix) and len(prefix) > best_len:
supports_reasoning, default_verbosity, allowed_efforts = caps
best_len = len(prefix)
kwargs = {}
if supports_reasoning:
effort, clamped = _clamp_effort(EFFORT, allowed_efforts)
if clamped:
print(f" WARNING: effort '{EFFORT}' not supported by {model}, using '{effort}'")
kwargs["reasoning"] = {"effort": effort}
verbosity = VERBOSITY or default_verbosity
if verbosity and verbosity != "none":
kwargs["text"] = {"verbosity": verbosity}
return kwargs
POLL_TIMEOUT = int(os.environ.get("POLL_TIMEOUT", "1800")) # 30 min default
def get_client():
"""Create and return an OpenAI client, validating API key is set."""
try:
from openai import OpenAI
except ImportError:
print("ERROR: openai not installed. Run: pip install openai")
sys.exit(1)
key = os.environ.get("OPENAI_API_KEY")
if not key:
print("ERROR: OPENAI_API_KEY not set.")
print(" Set in .env or: export OPENAI_API_KEY='sk-...'")
sys.exit(1)
return OpenAI(api_key=key)
def call_llm(client, instructions, user_input, label="", retries=3):
"""Call OpenAI Responses API with background mode + polling."""
model_kwargs = _model_capabilities(MODEL)
attempt = 0
stripped = False
while attempt < retries:
try:
if label: print(f" -> {MODEL} (effort={EFFORT}): {label}...")
# Create background response
resp = client.responses.create(
model=MODEL,
background=True,
store=True,
**model_kwargs,
max_output_tokens=MAX_OUTPUT,
instructions=instructions,
input=user_input,
)
# Poll until complete
poll_count = 0
poll_start = time.time()
while resp.status in ("queued", "in_progress"):
elapsed = time.time() - poll_start
if elapsed > POLL_TIMEOUT:
raise Exception(f"Polling timeout after {int(elapsed)}s (limit={POLL_TIMEOUT}s)")
time.sleep(3)
resp = client.responses.retrieve(resp.id)
poll_count += 1
if poll_count % 10 == 0:
print(f" still running... ({poll_count * 3}s)")
if resp.status == "failed":
print(f" API returned status=failed")
if hasattr(resp, 'error') and resp.error:
print(f" error: {resp.error}")
raise Exception(f"Response failed: {resp.status}")
text = resp.output_text
if not text:
raise Exception("Empty output_text returned")
# Log usage if available
if hasattr(resp, 'usage') and resp.usage:
u = resp.usage
inp = getattr(u, 'input_tokens', '?')
out = getattr(u, 'output_tokens', '?')
print(f" tokens: {inp} in / {out} out")
return text
except Exception as e:
# One-shot: strip reasoning/text kwargs on BadRequestError
from openai import BadRequestError
if isinstance(e, BadRequestError) and not stripped:
model_kwargs.pop("reasoning", None)
model_kwargs.pop("text", None)
stripped = True
print(f" WARNING: bad request, retrying without reasoning/text params")
continue # don't consume an attempt
wait = 2 ** (attempt + 1)
print(f" ERROR ({attempt+1}/{retries}): {e}")
if attempt < retries - 1:
print(f" retry in {wait}s...")
time.sleep(wait)
attempt += 1
print(f" FAILED after {retries} attempts")
return None
# ---------------------------------------------------------------------------
# PROMPTS
# ---------------------------------------------------------------------------
def load_prompt(name):
"""Load a prompt template from prompts/{name}.md."""
p = PROMPTS / f"{name}.md"
if not p.exists():
print(f"ERROR: {p} not found")
sys.exit(1)
return p.read_text(encoding="utf-8")
def syllabus_prompt(run):
"""Build the syllabus prompt for a given run, replacing all placeholders."""
t = load_prompt("syllabus")
# Run-specific vars
t = t.replace("{MODE}", run["mode"])
t = t.replace("{CORE_EPISODES}", run["core"])
t = t.replace("{FRONTIER_DIGEST}", run["frontier"])
t = t.replace("{AS_OF_OVERRIDE}", AS_OF)
# Role vars
t = t.replace("{ROLE}", ROLE)
t = t.replace("{COMPANY}", COMPANY)
t = t.replace("{DOMAIN}", DOMAIN)
t = t.replace("{AUDIENCE}", AUDIENCE)
# Count vars
t = t.replace("{TOTAL_CORE}", str(_CORE_COUNT))
t = t.replace("{CORE_RANGE}", f"1-{_CORE_COUNT}")
t = t.replace("{FRONTIER_RANGE}", _frontier_range_str())
t = t.replace("{FRONTIER_MAP}", _frontier_map_str())
t = t.replace("{LISTENING_ORDER}", _listening_order_str())
# Domain content injection
t = _inject_domain(t)
return t
def content_prompt(agenda, notes=""):
"""Build the content prompt for an episode agenda."""
t = load_prompt("content")
# Use replace() instead of format() because agenda text may contain {braces}
# Replace role vars BEFORE agenda injection to avoid replacing literals in user content
t = t.replace("{ROLE}", ROLE)
t = t.replace("{COMPANY}", COMPANY)
t = t.replace("{AS_OF_DATE}", AS_OF)
# Domain content (after role vars, before user content)
t = _inject_domain(t)
t = t.replace("{EXTRA_NOTES}", notes or "- No additional notes.")
t = t.replace("{EPISODE_AGENDA}", agenda)
return t
def distill_prompt(raw):
"""Build the distill prompt for a raw document."""
t = load_prompt("distill")
# Use replace() because raw doc may contain {braces}
# Replace role vars BEFORE raw doc injection to avoid replacing literals in user content
t = t.replace("{ROLE}", ROLE)
t = t.replace("{COMPANY}", COMPANY)
t = t.replace("{DOMAIN}", DOMAIN)
# Domain content (after role vars, before user content)
t = _inject_domain(t)
t = t.replace("{RAW_DOCUMENT}", raw)
return t
def render_template(text):
"""Replace all {PREP_*} and {AS_OF_DATE} placeholders with env var values."""
t = text
t = t.replace("{PREP_ROLE}", ROLE)
t = t.replace("{PREP_COMPANY}", COMPANY)
t = t.replace("{PREP_DOMAIN}", DOMAIN)
t = t.replace("{PREP_AUDIENCE}", AUDIENCE)
t = t.replace("{AS_OF_DATE}", AS_OF)
# Domain content (for gem.md etc.)
t = _inject_domain(t)
return t
def _syllabus_instructions():
"""System instructions for syllabus generation (dynamic for profile support)."""
return f"You are a {ROLE} at {COMPANY} acting as an expert interview coach. Follow the prompt instructions exactly. Output ONLY what the MODE asks for."
def _content_instructions():
"""System instructions for content generation (dynamic for profile support)."""
return f"You are a {ROLE} at {COMPANY} acting as an expert interview coach. Generate a dense, {ROLE}-level technical content document. Output ONLY the content document."
def _distill_instructions():
"""System instructions for document distillation (dynamic for profile support)."""
return f"You are a {ROLE} at {COMPANY} acting as an expert interview coach. Distill the provided document into an interview prep episode agenda. Output ONLY the agenda."
# ---------------------------------------------------------------------------
# PARSING
# ---------------------------------------------------------------------------
def parse_agendas(text):
"""Parse episode agendas from syllabus output. Returns {ep_num: text}."""
result = {}
# Match Episode N or Frontier Digest A/B/C at start of line,
# with optional prefixes: ##, **, numbering like "1) ", combinations thereof
pat = re.compile(
r'^[\s*#\d\)\.]*(?:Episode\s+(\d+))|'
r'^[\s*#\d\)\.]*(?:Frontier\s+Digest\s+([A-Z]))',
re.MULTILINE | re.IGNORECASE
)
matches = list(pat.finditer(text))
for i, m in enumerate(matches):
if m.group(1): ep = int(m.group(1))
elif m.group(2):
fmap = frontier_map()
letter = m.group(2).upper()
if letter not in fmap: continue
ep = fmap[letter]
else: continue
start = m.start()
end = matches[i+1].start() if i+1 < len(matches) else len(text)
result[ep] = text[start:end].strip()
return result
# ---------------------------------------------------------------------------
# FILE HELPERS
# ---------------------------------------------------------------------------
def ensure_dirs():
"""Create all output and input directories if they don't exist."""
for d in [SYLLABUS_DIR, EPISODES_DIR, GEM_DIR, NLM_DIR, RAW_DIR,
IN_AGENDAS, IN_EPISODES, IN_MISC]:
d.mkdir(parents=True, exist_ok=True)
def ep_file(ep, kind):
"""Return the filename for an episode, e.g. 'episode-01-agenda.md'."""
return f"episode-{ep:02d}-{kind}.md"
def find_agenda(ep):
"""Find the agenda file for an episode, checking inputs/ then outputs/."""
for d in [IN_AGENDAS, SYLLABUS_DIR]:
p = d / ep_file(ep, "agenda")
if p.exists(): return p
return None
def find_content(ep):
"""Find the content file for an episode, checking inputs/ then outputs/."""
for d in [IN_EPISODES, EPISODES_DIR]:
p = d / ep_file(ep, "content")
if p.exists(): return p
return None
def _recover_from_pattern(pattern):
"""Recover agendas from raw files matching pattern. Returns count recovered."""
count = 0
for raw_file in sorted(RAW_DIR.glob(f"syllabus-*-{pattern}*.md")):
text = raw_file.read_text(encoding="utf-8")
if not text.strip():
continue
parsed = parse_agendas(text)
for ep, txt in parsed.items():
p = SYLLABUS_DIR / ep_file(ep, "agenda")
if not p.exists() and not (IN_AGENDAS / ep_file(ep, "agenda")).exists():
p.write_text(txt, encoding="utf-8")
print(f" recovered {p.name} from {raw_file.name}")
count += 1
return count
def recover_agendas_from_raw():
"""If raw syllabus files exist but agenda files don't, re-parse them."""
recovered = _recover_from_pattern("core_batch") + _recover_from_pattern("frontier_digest")
if recovered:
print(f" recovered {recovered} agendas from raw files\n")
return recovered
# ---------------------------------------------------------------------------
# COMMANDS
# ---------------------------------------------------------------------------
def cmd_syllabus(client, force=False):
"""Run the multi-pass syllabus generation pipeline. Returns True on success."""
print(f"\n=== SYLLABUS ({len(SYLLABUS_RUNS)} runs) ===\n")
if force: print(" (--force: regenerating all)\n")
recover_agendas_from_raw()
prior_outputs = [] # accumulate prior run outputs as context
for i, run in enumerate(SYLLABUS_RUNS):
num = i + 1
mode = run["mode"]
tag = f"Run {num}/{len(SYLLABUS_RUNS)}: {mode}"
if run["core"]: tag += f" ({run['core']})"
if run["frontier"]: tag += f" (Digest {run['frontier']})"
# Skip if agendas exist (unless --force)
if not force and mode == "SCAFFOLD":
p = SYLLABUS_DIR / "scaffold.md"
if p.exists():
print(f" skip {tag} - scaffold exists")
prior_outputs.append(p.read_text(encoding="utf-8"))
continue
if not force and mode == "FINAL_MERGE":
p = SYLLABUS_DIR / "final_merge.md"
if p.exists():
print(f" skip {tag} - final_merge exists")
prior_outputs.append(p.read_text(encoding="utf-8"))
continue
if not force and mode == "CORE_BATCH":
parts = run["core"].split("-")
s, e = int(parts[0]), int(parts[-1])
if all(find_agenda(n) for n in range(s, e+1)):
print(f" skip {tag} - agendas exist")
for n in range(s, e+1):
prior_outputs.append(find_agenda(n).read_text(encoding="utf-8"))
continue
if not force and mode == "FRONTIER_DIGEST":
ep = frontier_map()[run["frontier"]]
if find_agenda(ep):
print(f" skip {tag} - agenda exists")
prior_outputs.append(find_agenda(ep).read_text(encoding="utf-8"))
continue
# Build prompt with prior context embedded in input
prompt = syllabus_prompt(run)
if prior_outputs:
context = "\n\n---\nPRIOR SYLLABUS OUTPUTS (for context continuity):\n---\n\n"
context += "\n\n---\n\n".join(prior_outputs)
user_input = context + "\n\n---\nCURRENT RUN:\n---\n\n" + prompt
else:
user_input = prompt
resp = call_llm(client, _syllabus_instructions(), user_input, label=tag)
if not resp:
print(f" FAIL {tag}"); return False
(RAW_DIR / f"syllabus-{num:02d}-{mode.lower()}.md").write_text(resp, encoding="utf-8")
prior_outputs.append(resp)
if mode in ("CORE_BATCH", "FRONTIER_DIGEST"):
parsed = parse_agendas(resp)
if not parsed:
print(f" WARNING: parse_agendas found 0 episodes in {tag} output!")
print(f" Raw output saved to {RAW_DIR / f'syllabus-{num:02d}-{mode.lower()}.md'}")
print(f" Check format: expected '## Episode N:' or '## Frontier Digest A/B/C:'")
for ep, txt in parsed.items():
p = SYLLABUS_DIR / ep_file(ep, "agenda")
p.write_text(txt, encoding="utf-8")
print(f" saved {p.name}")
if mode in ("SCAFFOLD", "FINAL_MERGE"):
p = SYLLABUS_DIR / f"{mode.lower()}.md"
p.write_text(resp, encoding="utf-8")
print(f" saved {p.name}")
print(f" done {tag}")
print("\n=== SYLLABUS COMPLETE ===\n")
return True
def _print_syllabus_review(profile_name):
"""Print review checklist after standalone syllabus generation."""
total = len(ALL_EPS)
print("Review before running content generation:")
print()
print(f" [ ] Episode count matches expectations ({total} episodes)")
print(" [ ] Topics cover JD requirements (cross-reference with domain/coverage.md)")
print(" [ ] No duplicate topics across episodes")
print(" [ ] No obvious domain gaps")
print(" [ ] Frontier digests cover emerging/advanced topics")
print(" [ ] Mental models are distinct (not variations of the same idea)")
print()
print(f"Satisfied? Run: python3 prep.py content --profile {profile_name}")
print(f"To regenerate: python3 prep.py syllabus --profile {profile_name} --force")
def cmd_content(client, force=False, episode=None):
"""Generate content for each episode from its agenda. Returns True if no failures."""
print("\n=== CONTENT GENERATION ===\n")
if force: print(" (--force: regenerating all)\n")
recover_agendas_from_raw()
gen = skip = warn = fail = 0
eps_to_process = [episode] if episode is not None else ALL_EPS
for ep in eps_to_process:
c = find_content(ep)
if not force and c:
if len(c.read_text(encoding="utf-8").strip()) < 500:
print(f" warn ep {ep:02d} - content file too small ({c}), regenerating")
else:
print(f" skip ep {ep:02d} - content exists")
skip += 1; continue
ag = find_agenda(ep)
if not ag:
print(f" warn ep {ep:02d} - no agenda"); warn += 1; continue
agenda_text = ag.read_text(encoding="utf-8").strip()
if not agenda_text:
print(f" warn ep {ep:02d} - agenda file is empty ({ag})"); warn += 1; continue
prompt = content_prompt(agenda_text)
resp = call_llm(client, _content_instructions(), prompt, label=f"Episode {ep:02d}")
if not resp:
print(f" FAIL ep {ep:02d}"); fail += 1; continue
p = EPISODES_DIR / ep_file(ep, "content")
p.write_text(resp, encoding="utf-8")
(RAW_DIR / ep_file(ep, "content-raw")).write_text(resp, encoding="utf-8")
print(f" saved ep {ep:02d} ({len(resp):,} chars)")
gen += 1
print(f"\n=== CONTENT: {gen} generated, {skip} skipped, {fail} failed ===\n")
if gen == 0 and warn > 0:
print(f" WARNING: {warn} episode(s) had no agenda.")
print(f" Run syllabus first: python3 prep.py syllabus --profile <name>\n")
if fail > 0:
print(f" WARNING: {fail} episode(s) failed. Re-run to retry.\n")
return fail == 0
def cmd_package():
"""Package episode content into Gem merged files and NotebookLM individual files."""
print("\n=== PACKAGING ===\n")
content = {}
total = _total_gem_slots()
# Search ALL_EPS plus a buffer for extra episodes beyond the configured range
search_range = ALL_EPS + list(range(len(ALL_EPS) + 1, len(ALL_EPS) + 15))
for ep in search_range:
c = find_content(ep)
if c: content[ep] = c.read_text(encoding="utf-8")
# Also find misc content
misc_files = sorted(EPISODES_DIR.glob("misc-*-content.md"))
if not content and not misc_files:
print(" No content found."); return False
# NotebookLM: individual files
for ep, txt in content.items():
(NLM_DIR / ep_file(ep, "content")).write_text(txt, encoding="utf-8")
for f in misc_files:
(NLM_DIR / f.name).write_text(f.read_text(encoding="utf-8"), encoding="utf-8")
print(f" NotebookLM: {len(content) + len(misc_files)} files")
# Gem: dynamic merged files
buckets = {i: [] for i in range(1, total + 1)}
for ep, txt in sorted(content.items()):
buckets[gem_slot(ep)].append((f"EPISODE {ep}", txt))
for f in misc_files:
buckets[total].append((f"MISC: {f.stem}", f.read_text(encoding="utf-8")))
for slot, items in buckets.items():
if not items: continue
merged = []
for label, txt in items:
merged.append(f"{'='*60}\n{label}\n{'='*60}\n\n{txt}")
(GEM_DIR / f"gem-{slot}.md").write_text("\n\n".join(merged), encoding="utf-8")
names = [lbl for lbl, _ in items]
print(f" gem-{slot}.md: {', '.join(names)}")
# Copy scaffold/merge for reference
for n in ["scaffold.md", "final_merge.md"]:
src = SYLLABUS_DIR / n
if src.exists(): (GEM_DIR / f"gem-0-{n}").write_text(src.read_text(encoding="utf-8"), encoding="utf-8")
print(f"\n=== PACKAGE COMPLETE ===")
print(f" NotebookLM -> {NLM_DIR}/")
print(f" Gem -> {GEM_DIR}/\n")
return True
def cmd_add(client, filepath, slot=None):
"""Distill an external document into an episode and append to a gem slot."""
slot = slot or _total_gem_slots()
print(f"\n=== ADD: {filepath} -> gem-{slot} ===\n")
src = Path(filepath)
if not src.exists():
print(f"ERROR: {filepath} not found"); return False
try:
raw = src.read_text(encoding="utf-8")
except (UnicodeDecodeError, ValueError):
print(f"ERROR: {filepath} is not valid UTF-8 text.")
print(f" The 'add' command requires text files (.md, .txt, .html).")
return False
name = re.sub(r'[^a-zA-Z0-9_-]', '_', src.stem)[:50]
# Step 1: Distill
print(" 1. Distill -> Agenda")
agenda = call_llm(client, _distill_instructions(), distill_prompt(raw), "Distill")
if not agenda: return False
(SYLLABUS_DIR / f"misc-{name}-agenda.md").write_text(agenda, encoding="utf-8")
# Step 2: Content
print(" 2. Agenda -> Content")
cont = call_llm(client, _content_instructions(), content_prompt(agenda), "Content")
if not cont: return False
(EPISODES_DIR / f"misc-{name}-content.md").write_text(cont, encoding="utf-8")
(NLM_DIR / f"misc-{name}-content.md").write_text(cont, encoding="utf-8")
# Step 3: Append to gem
gem_path = GEM_DIR / f"gem-{slot}.md"
sep = f"\n\n{'='*60}\nMISC: {src.name}\n{'='*60}\n\n"
with open(gem_path, "a", encoding="utf-8") as f:
f.write(sep + cont)
print(f" Appended to gem-{slot}.md")
print(f"\n=== ADD COMPLETE ===\n")
return True
# ---------------------------------------------------------------------------
# SETUP COMMAND (generate domain files via API)
# ---------------------------------------------------------------------------
def _setup_instructions():
"""System instructions for setup calls (domain file generation)."""
return f"You are a {ROLE} at {COMPANY} acting as an expert interview coach. Generate the requested domain-specific content sections exactly as specified. Output ONLY the sections with their marker comments — no preamble, no explanations."
def _gather_context_docs():
"""Read .md/.txt files from IN_MISC, return concatenated text or fallback."""
texts = []
if IN_MISC.is_dir():
for f in sorted(IN_MISC.iterdir()):
if f.suffix in (".md", ".txt"):
texts.append(f"--- {f.name} ---\n{f.read_text(encoding='utf-8')}")
return "\n\n".join(texts) if texts else "(No additional context documents provided.)"
def _write_domain_file(domain_dir, filename, markers, parsed):
"""Write a single domain file from parsed sections dict. Warns on missing markers."""
found = {m: parsed[m] for m in markers if m in parsed}
missing = [m for m in markers if m not in parsed]
if missing:
print(f" WARNING: {filename} missing markers: {', '.join(missing)}")
if not found:
print(f" WARNING: {filename} has no recognized markers, skipping")
return False
lines = []
for marker in markers:
if marker in found:
lines.append(f"<!-- {marker} -->")
lines.append(found[marker])
lines.append("")
(domain_dir / filename).write_text("\n".join(lines).strip() + "\n", encoding="utf-8")
return True
def _build_setup_prompt(prompt_name, profile_text, **extra):
"""Load a meta-prompt and replace role/domain/profile placeholders."""
t = load_prompt(prompt_name)
t = t.replace("{ROLE}", ROLE)
t = t.replace("{COMPANY}", COMPANY)
t = t.replace("{DOMAIN}", DOMAIN)
t = t.replace("{AUDIENCE}", AUDIENCE)
t = t.replace("{PROFILE_CONTENT}", profile_text)
for key, value in extra.items():
t = t.replace("{" + key + "}", value)
return t
def cmd_setup(client, profile_name, force=False):
"""Generate domain files via 3 API calls."""
print(f"\n=== SETUP: {profile_name} (3 calls) ===\n")
profile_dir = BASE_DIR / "profiles" / profile_name
domain_dir = profile_dir / "domain"
domain_dir.mkdir(parents=True, exist_ok=True)
# Check if domain files already exist (skip unless --force)
if not force:
existing = [f for f in _DOMAIN_FILES if not _is_stub(domain_dir / f)]
if existing:
print(f" Domain files already exist: {', '.join(existing)}")
print(f" Use --force to regenerate.")
return True
# Read profile text + context docs
profile_text = (profile_dir / "profile.md").read_text(encoding="utf-8")
context_docs = _gather_context_docs()
# Call 1: meta-seeds -> seeds.md + coverage.md
print(" 1/3: Generating seeds + coverage framework...")
prompt1 = _build_setup_prompt("meta-seeds", profile_text,
CONTEXT_DOCS=context_docs)
resp1 = call_llm(client, _setup_instructions(), prompt1, label="Seeds + Coverage")
if not resp1:
print(" FAILED: call 1 (seeds + coverage)")
return False
(RAW_DIR / f"setup-1-seeds.md").write_text(resp1, encoding="utf-8")
parsed1 = _parse_domain_sections(resp1)
_write_domain_file(domain_dir, "seeds.md", ["DOMAIN_SEEDS"], parsed1)
_write_domain_file(domain_dir, "coverage.md", ["COVERAGE_FRAMEWORK"], parsed1)
# Call 2: meta-lenses -> lenses.md
print(" 2/3: Generating lenses...")
prompt2 = _build_setup_prompt("meta-lenses", profile_text)
resp2 = call_llm(client, _setup_instructions(), prompt2, label="Lenses")
if not resp2:
print(" FAILED: call 2 (lenses)")