From 0daefd3288c3d5e89404fa1e587f61d3a972db9a Mon Sep 17 00:00:00 2001 From: NotYuSheng Date: Fri, 26 Jun 2026 00:33:23 +0800 Subject: [PATCH 1/6] fix(sharegpt): coerce conversations to LLaMA-Factory's alternation contract to_sharegpt only checked that both human and gpt roles were present, but LLaMA-Factory additionally requires each conversation to start with human, strictly alternate, and end with gpt. Real chats violate this constantly (other person messages first; same-role runs survive reply-stitching / multi-speaker), so ~72% of samples were silently dropped at train time (3527 written -> 997 trained) with only WARNING logs. Add _coerce_alternating: merge consecutive same-speaker turns (labels stay in the text), strip a leading gpt turn, strip a trailing human turn. Salvages human,gpt,human -> human,gpt instead of discarding the whole sample. On the same export this raises usable samples from 997 to ~3324. Fixes #42 Co-Authored-By: Claude Opus 4.8 --- ingest/sharegpt.py | 46 ++++++++++++++++++++---- tests/test_ingest.py | 83 +++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 117 insertions(+), 12 deletions(-) diff --git a/ingest/sharegpt.py b/ingest/sharegpt.py index 08d8e8b..89be550 100644 --- a/ingest/sharegpt.py +++ b/ingest/sharegpt.py @@ -16,11 +16,45 @@ } +def _coerce_alternating(conversations: list) -> list: + """Coerce a ``{"from", "value"}`` turn list into the shape LLaMA-Factory's + ShareGPT converter accepts: starts with ``human``, strictly alternates + ``human``/``gpt``, and ends with ``gpt`` (even number of turns). + + LLaMA-Factory enforces ``messages[i]`` is human for even ``i`` and gpt for + odd ``i``, and rejects odd-length conversations outright. Raw chats break + both rules constantly (the other person messages first; same-speaker runs + survive reply-stitching / multi-speaker labelling), so without this they are + silently dropped at train time. We salvage them instead of discarding: + + - merge consecutive same-speaker turns (labels stay embedded in the text), + - drop leading ``gpt`` turns so it starts on ``human``, + - drop the trailing ``human`` turn so it ends on ``gpt``. + + Returns ``[]`` if no ``human -> gpt`` pair survives. + """ + merged: list = [] + for turn in conversations: + if merged and merged[-1]["from"] == turn["from"]: + merged[-1]["value"] = f"{merged[-1]['value']}\n{turn['value']}" + else: + merged.append(dict(turn)) + + while merged and merged[0]["from"] != "human": + merged.pop(0) + while merged and merged[-1]["from"] != "gpt": + merged.pop() + + return merged if len(merged) >= 2 else [] + + def to_sharegpt(samples: List[Sample]) -> list: """Convert role/text samples to ShareGPT ``{"conversations": [...]}`` records. - Drops empty turns and keeps only samples that contain both a human and a gpt - turn (LLaMA-Factory needs both sides to train on). + Drops empty turns, then coerces each sample into the strictly-alternating + ``human -> gpt -> ...`` shape LLaMA-Factory requires (see + :func:`_coerce_alternating`). Samples with no usable ``human -> gpt`` pair + are dropped. """ output = [] for turns in samples: @@ -31,11 +65,9 @@ def to_sharegpt(samples: List[Sample]) -> list: if speaker and value: conversations.append({"from": speaker, "value": value}) - roles_present = {c["from"] for c in conversations} - if "human" not in roles_present or "gpt" not in roles_present: - continue - - output.append({"conversations": conversations}) + conversations = _coerce_alternating(conversations) + if conversations: + output.append({"conversations": conversations}) return output diff --git a/tests/test_ingest.py b/tests/test_ingest.py index ca4ab30..3079cb6 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -78,7 +78,8 @@ def _write_fixture(dir_path): {"conversations": [ {"from": "human", "value": "hey there\nyou around?"}, {"from": "gpt", "value": "yeah whats up\nstill here"}, - {"from": "human", "value": "cool"}, + # trailing human turn ("cool") is trimmed so the sample ends on gpt + # (LLaMA-Factory rejects odd-length / trailing-human conversations). ]}, {"conversations": [ {"from": "human", "value": "later message"}, @@ -208,11 +209,12 @@ def test_default_collapses_other_side(self): def test_multi_speaker_labels_users_not_assistant(self): out = sharegpt.to_sharegpt(core.build_samples(self._group(), multi_speaker=True)) convs = out[0]["conversations"] - # Distinct speakers stay distinct and are labelled... - self.assertEqual(convs[0], {"from": "human", "value": "Bob: q1"}) - self.assertEqual(convs[1], {"from": "human", "value": "Carol: q2"}) + # Distinct speakers are labelled, then merged into one human turn so the + # conversation alternates human/gpt (LLaMA-Factory rejects consecutive + # same-role turns). The labels are preserved in the merged text... + self.assertEqual(convs[0], {"from": "human", "value": "Bob: q1\nCarol: q2"}) # ...but the owner's (assistant) turn is never labelled. - self.assertEqual(convs[2], {"from": "gpt", "value": "answer"}) + self.assertEqual(convs[1], {"from": "gpt", "value": "answer"}) class ShareGptTest(unittest.TestCase): @@ -234,6 +236,77 @@ def test_jsonl_roundtrip(self): self.assertEqual(sharegpt.load_jsonl_samples(p), samples) +class CoerceAlternatingTest(unittest.TestCase): + """LLaMA-Factory requires each conversation to start with human, strictly + alternate, and end with gpt. to_sharegpt must coerce raw chats into that + shape rather than emit samples that get silently dropped at train time. + """ + + def _conv(self, samples): + return sharegpt.to_sharegpt(samples)[0]["conversations"] + + def test_leading_assistant_turn_is_stripped(self): + # The other person messaged first -> drop the leading gpt turn. + samples = [[ + {"role": "assistant", "text": "u there?"}, + {"role": "user", "text": "ya"}, + {"role": "assistant", "text": "ok"}, + ]] + self.assertEqual(self._conv(samples), [ + {"from": "human", "value": "ya"}, + {"from": "gpt", "value": "ok"}, + ]) + + def test_trailing_user_turn_is_stripped(self): + # Conversation ends on a human turn (odd length) -> drop it so it ends gpt. + samples = [[ + {"role": "user", "text": "hi"}, + {"role": "assistant", "text": "yo"}, + {"role": "user", "text": "u free?"}, + ]] + self.assertEqual(self._conv(samples), [ + {"from": "human", "value": "hi"}, + {"from": "gpt", "value": "yo"}, + ]) + + def test_consecutive_same_role_turns_are_merged(self): + # Same-role runs (e.g. from reply-stitching) are merged, not left to + # break alternation. + samples = [[ + {"role": "user", "text": "a"}, + {"role": "user", "text": "b"}, + {"role": "assistant", "text": "c"}, + {"role": "assistant", "text": "d"}, + ]] + self.assertEqual(self._conv(samples), [ + {"from": "human", "value": "a\nb"}, + {"from": "gpt", "value": "c\nd"}, + ]) + + def test_no_usable_pair_is_dropped(self): + # A lone human turn (after trimming) leaves nothing trainable. + self.assertEqual(sharegpt.to_sharegpt([[{"role": "user", "text": "hello?"}]]), []) + + def test_output_is_even_length_and_well_formed(self): + # Every emitted conversation must start human, end gpt, and alternate. + for record in sharegpt.to_sharegpt(EXPECTED_SAMPLES_FOR_SHAPE): + convs = record["conversations"] + self.assertEqual(len(convs) % 2, 0) + self.assertEqual(convs[0]["from"], "human") + self.assertEqual(convs[-1]["from"], "gpt") + expected = ["human", "gpt"] * (len(convs) // 2) + self.assertEqual([c["from"] for c in convs], expected) + + +# Messy inputs that all must come out well-formed. +EXPECTED_SAMPLES_FOR_SHAPE = [ + [{"role": "assistant", "text": "first"}, {"role": "user", "text": "u"}, + {"role": "assistant", "text": "me"}], + [{"role": "user", "text": "x"}, {"role": "user", "text": "y"}, + {"role": "assistant", "text": "z"}, {"role": "user", "text": "trailing"}], +] + + class ValidatorSplitTest(unittest.TestCase): def test_apply_split_cuts_after_indices(self): from ingest.validator import _apply_split From 38ed5b3339fef1d0c17cd51fc2b55e5b91a21137 Mon Sep 17 00:00:00 2001 From: NotYuSheng Date: Fri, 26 Jun 2026 00:33:23 +0800 Subject: [PATCH 2/6] feat(runner): step-based doppelganger CLI over the ingest + training pipeline Add `python -m doppelganger`: an interactive numbered menu (no args) plus named subcommands parse / audit / train / merge / chat, and an `auto` mode that runs parse -> audit end-to-end (like the old one-shot ingest, redact replace by default). parse/audit reuse ingest.*; train/merge/chat shell out to llamafactory-cli. The parse/audit split makes "review before training" a first-class step. Co-Authored-By: Claude Opus 4.8 --- doppelganger/__init__.py | 6 + doppelganger/__main__.py | 146 ++++++++++++++++++++++++ doppelganger/steps.py | 233 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 385 insertions(+) create mode 100644 doppelganger/__init__.py create mode 100644 doppelganger/__main__.py create mode 100644 doppelganger/steps.py diff --git a/doppelganger/__init__.py b/doppelganger/__init__.py new file mode 100644 index 0000000..7908bb3 --- /dev/null +++ b/doppelganger/__init__.py @@ -0,0 +1,6 @@ +"""Doppelganger orchestrator: a step-based runner over the ingest pipeline and +LLaMA-Factory training. + +Run ``python -m doppelganger`` for an interactive menu, or a named subcommand +(``parse``, ``audit``, ``train``, ``merge``, ``chat``, ``auto``). +""" diff --git a/doppelganger/__main__.py b/doppelganger/__main__.py new file mode 100644 index 0000000..401ee2a --- /dev/null +++ b/doppelganger/__main__.py @@ -0,0 +1,146 @@ +"""Doppelganger runner entry point. + + python -m doppelganger # interactive numbered menu + python -m doppelganger parse ... # named subcommand + python -m doppelganger auto # run the whole pipeline unattended +""" + +import argparse +import sys +from typing import List, Optional + +from ingest import banner +from ingest.adapters import available_sources + +from doppelganger import steps + + +def _add_parse_flags(p: argparse.ArgumentParser, with_redact: bool = False) -> None: + p.add_argument("--source", default="telegram", + help=f"Chat source. Supported: {', '.join(available_sources())}.") + p.add_argument("--input", dest="input_path", default=steps.DEFAULT_INPUT, + help="Path to the raw export.") + p.add_argument("--self-name", default=None, help="Override auto-detection of which sender is you.") + p.add_argument("--conversation-gap", type=int, default=3600, + help="Seconds of silence that start a new conversation.") + p.add_argument("--message-chain", type=int, default=30, + help="Max seconds between same-sender messages to merge into one turn.") + p.add_argument("--multi-speaker", action="store_true", + help="Keep and label individual senders in group chats.") + p.add_argument("--redact-locales", default=None, + help="Comma-separated locales for sensitive-data detection (e.g. SG,US).") + if with_redact: + p.add_argument("--redact", choices=["off", "replace", "drop"], default="replace", + help="What to do with detected sensitive data (default: replace).") + + +def _locales(value: Optional[str]) -> Optional[List[str]]: + if not value: + return None + return [s.strip() for s in value.split(",") if s.strip()] + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="doppelganger", + description="Step-based runner for the Doppelganger pipeline. " + "Run with no command for an interactive menu.", + ) + sub = parser.add_subparsers(dest="command") + + p = sub.add_parser("parse", help="Parse export → conversations + scan.") + _add_parse_flags(p) + p.add_argument("--skip-redact-scan", action="store_true", help="Skip the sensitive-data scan.") + + a = sub.add_parser("audit", help="Review scan → redact → dataset.") + a.add_argument("--redact", choices=["off", "replace", "drop"], default="replace", + help="What to do with detected sensitive data (default: replace).") + a.add_argument("--redact-locales", default=None, help="Comma-separated detection locales.") + a.add_argument("--skip-validation", action="store_true", help="Skip the optional LLM quality audit.") + + t = sub.add_parser("train", help="LoRA fine-tune on the dataset.") + t.add_argument("--config", default=None, help="Training config (default: configs/train_lora[.local].yaml).") + t.add_argument("--gpus", default=None, help="CUDA_VISIBLE_DEVICES, e.g. '0' or '0,1,2,3'.") + + m = sub.add_parser("merge", help="Merge the LoRA adapter into the base model.") + m.add_argument("--config", default=None, help="Export config (default: configs/export_lora[.local].yaml).") + m.add_argument("--gpus", default=None, help="CUDA_VISIBLE_DEVICES.") + + c = sub.add_parser("chat", help="Chat with the fine-tuned model.") + c.add_argument("--gpus", default=None, help="CUDA_VISIBLE_DEVICES, e.g. '0'.") + + au = sub.add_parser("auto", help="Run parse → audit end-to-end (optionally → train).") + _add_parse_flags(au, with_redact=True) + au.add_argument("--train", dest="do_train", action="store_true", help="Also run training after the dataset is built.") + au.add_argument("--gpus", default=None, help="CUDA_VISIBLE_DEVICES for training.") + return parser + + +def _dispatch(args: argparse.Namespace) -> int: + if args.command == "parse": + return steps.parse( + source=args.source, input_path=args.input_path, self_name=args.self_name, + conversation_gap=args.conversation_gap, message_chain=args.message_chain, + multi_speaker=args.multi_speaker, locales=_locales(args.redact_locales), + scan=not args.skip_redact_scan, + ) + if args.command == "audit": + return steps.audit(redact=args.redact, locales=_locales(args.redact_locales), + validate=not args.skip_validation) + if args.command == "train": + return steps.train(config=args.config, gpus=args.gpus) + if args.command == "merge": + return steps.merge(config=args.config, gpus=args.gpus) + if args.command == "chat": + return steps.chat(gpus=args.gpus) + if args.command == "auto": + return steps.auto( + do_train=args.do_train, redact=args.redact, gpus=args.gpus, + source=args.source, input_path=args.input_path, self_name=args.self_name, + conversation_gap=args.conversation_gap, message_chain=args.message_chain, + multi_speaker=args.multi_speaker, locales=_locales(args.redact_locales), + ) + return 2 + + +# ── Interactive menu ────────────────────────────────────────────────────────── +def _menu() -> int: + banner.print_banner() + while True: + print("\nDoppelganger — pick a step (q to quit):") + for i, (name, desc, done) in enumerate(steps.STEPS, 1): + mark = "✓" if done() else " " + print(f" [{mark}] {i}. {name:6} — {desc}") + print(" a. auto — run parse → audit end-to-end") + choice = input("> ").strip().lower() + + if choice in ("q", "quit", "exit", ""): + return 0 + if choice == "a": + steps.auto() + continue + if choice.isdigit() and 1 <= int(choice) <= len(steps.STEPS): + name = steps.STEPS[int(choice) - 1][0] + rc = { + "parse": steps.parse, "audit": steps.audit, "train": steps.train, + "merge": steps.merge, "chat": steps.chat, + }[name]() + if rc: + print(f"[{name}] exited with code {rc}.") + else: + print("Unrecognized choice.") + + +def main(argv: Optional[List[str]] = None) -> int: + steps.load_env() + argv = sys.argv[1:] if argv is None else argv + if not argv: + return _menu() + args = build_parser().parse_args(argv) + if args.command is None: + return _menu() + return _dispatch(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/doppelganger/steps.py b/doppelganger/steps.py new file mode 100644 index 0000000..5317249 --- /dev/null +++ b/doppelganger/steps.py @@ -0,0 +1,233 @@ +"""The individual pipeline steps the runner exposes. + +Each step is a thin wrapper over existing code: ``parse``/``audit`` reuse the +``ingest`` modules; ``train``/``merge``/``chat`` shell out to ``llamafactory-cli``. +Every step returns a process-style exit code (0 = success). +""" + +import os +import subprocess +from typing import List, Optional + +from ingest import banner, core, redactor, sharegpt +from ingest.adapters import available_sources, get_adapter +from ingest.cli import _load_dotenv +from ingest.validator import validate_samples + +# ── Canonical artifact paths (one source of truth) ─────────────────────────── +DATA_DIR = "data" +DEFAULT_INPUT = os.path.join(DATA_DIR, "result.json") +RAW_PATH = os.path.join(DATA_DIR, "chat_dataset.jsonl") # parse output +DATASET_PATH = os.path.join(DATA_DIR, "chat_sharegpt.json") # audit output (trains) +REPORT_PATH = os.path.join(DATA_DIR, "redaction_report.json") + + +def _resolve_config(stem: str) -> str: + """Prefer a local override (``*.local.yaml``) over the tracked default.""" + local = os.path.join("configs", f"{stem}.local.yaml") + return local if os.path.exists(local) else os.path.join("configs", f"{stem}.yaml") + + +def train_config() -> str: + return _resolve_config("train_lora") + + +def export_config() -> str: + return _resolve_config("export_lora") + + +def _read_yaml(path: str) -> dict: + import yaml + + with open(path, encoding="utf-8") as f: + return yaml.safe_load(f) or {} + + +def adapter_dir() -> Optional[str]: + """Where training writes the LoRA adapter (``output_dir`` in the config).""" + try: + return _read_yaml(train_config()).get("output_dir") + except OSError: + return None + + +# ── Status (for the menu's checkmarks / out-of-order guards) ────────────────── +def parse_done() -> bool: + return os.path.exists(RAW_PATH) + + +def audit_done() -> bool: + return os.path.exists(DATASET_PATH) + + +def train_done() -> bool: + out = adapter_dir() + return bool(out) and os.path.exists(os.path.join(out, "adapter_model.safetensors")) + + +# ── Steps ───────────────────────────────────────────────────────────────────── +def parse( + source: str = "telegram", + input_path: str = DEFAULT_INPUT, + self_name: Optional[str] = None, + conversation_gap: int = 3600, + message_chain: int = 30, + multi_speaker: bool = False, + locales: Optional[List[str]] = None, + scan: bool = True, +) -> int: + """Stage 1: export → conversations → (scan). Writes raw samples + report. + + No redaction is applied here — that's :func:`audit`. The raw ``.jsonl`` holds + unredacted text and is gitignored; treat it as sensitive. + """ + if source not in available_sources(): + print(f"error: source '{source}' not supported. Choose: {', '.join(available_sources())}") + return 2 + if not os.path.exists(input_path): + print(f"error: input not found: {input_path}\n" + f" Place your export there or pass --input.") + return 1 + + os.makedirs(DATA_DIR, exist_ok=True) + print(f"Loading {input_path} via '{source}' adapter...") + messages = get_adapter(source).parse(input_path, self_name=self_name) + + print("Building conversation samples...") + samples = core.build_samples( + messages, + conversation_gap=conversation_gap, + message_chain=message_chain, + multi_speaker=multi_speaker, + ) + print(f"Extracted {len(samples)} conversation samples.") + + if scan: + report = redactor.scan_samples(samples, locales=locales) + redactor.write_report(report, REPORT_PATH) + redactor.print_summary(report, REPORT_PATH, mode="off") + + written = sharegpt.write_jsonl(samples, RAW_PATH) + print(f"Wrote {written} raw conversation samples to {RAW_PATH}.") + print(f"Next: review {REPORT_PATH}, then run `audit`.") + return 0 + + +def audit( + redact: str = "replace", + locales: Optional[List[str]] = None, + validate: bool = True, +) -> int: + """Stage 2: review scan → redact → (optional LLM quality audit) → dataset. + + Consumes the raw ``.jsonl`` from :func:`parse` and writes the training-ready + ShareGPT dataset. + """ + if not parse_done(): + print(f"error: {RAW_PATH} not found — run `parse` first.") + return 1 + + samples = sharegpt.load_jsonl_samples(RAW_PATH) + print(f"Loaded {len(samples)} conversation samples from {RAW_PATH}.") + + # Re-derive the scan summary so audit is self-contained (regex is cheap). + report = redactor.scan_samples(samples, locales=locales) + redactor.write_report(report, REPORT_PATH) + redactor.print_summary(report, REPORT_PATH, mode=redact) + + if redact != "off": + before = len(samples) + samples = redactor.apply(samples, redact, locales=locales) + print(f"[redactor] Applied --redact {redact}: {before} -> {len(samples)} samples.") + + if validate: + samples = validate_samples(samples) # self-disables if no LLM configured + + written = sharegpt.write_sharegpt(samples, DATASET_PATH) + print(f"Wrote {written} ShareGPT samples to {DATASET_PATH}.") + print("Next: run `train`.") + return 0 + + +def _llamafactory(args: List[str], gpus: Optional[str] = None) -> int: + env = dict(os.environ) + if gpus: + env["CUDA_VISIBLE_DEVICES"] = gpus + print(f"$ llamafactory-cli {' '.join(args)}" + + (f" (CUDA_VISIBLE_DEVICES={gpus})" if gpus else "")) + try: + return subprocess.run(["llamafactory-cli", *args], env=env).returncode + except FileNotFoundError: + print("error: llamafactory-cli not found. Activate the venv (and `pip install -r requirements.txt`).") + return 127 + + +def train(config: Optional[str] = None, gpus: Optional[str] = None) -> int: + """Stage 3: LoRA fine-tune via LLaMA-Factory.""" + if not audit_done(): + print(f"warning: {DATASET_PATH} not found — run `parse` + `audit` (or `auto`) first.") + return 1 + return _llamafactory(["train", config or train_config()], gpus=gpus) + + +def merge(config: Optional[str] = None, gpus: Optional[str] = None) -> int: + """Stage 4 (optional): merge the LoRA adapter into the base model.""" + return _llamafactory(["export", config or export_config()], gpus=gpus) + + +def chat(gpus: Optional[str] = None) -> int: + """Stage 5: chat with the fine-tuned model (base + adapter, nothing merged).""" + cfg = _read_yaml(train_config()) + out = cfg.get("output_dir") + if not out or not os.path.exists(os.path.join(out, "adapter_model.safetensors")): + print(f"warning: no trained adapter at {out!r} — run `train` first.") + return 1 + return _llamafactory([ + "chat", + "--model_name_or_path", cfg.get("model_name_or_path", ""), + "--adapter_name_or_path", out, + "--template", cfg.get("template", "default"), + "--finetuning_type", "lora", + "--infer_dtype", "bfloat16", + ], gpus=gpus) + + +def auto( + do_train: bool = False, + redact: str = "replace", + gpus: Optional[str] = None, + **parse_kwargs, +) -> int: + """Run the pipeline end-to-end, unattended (like the old one-shot ingest). + + parse → audit (redact, no review pause) → dataset. With ``do_train`` it also + chains training. Redaction defaults to ``replace`` so it never trains on raw + PII without an explicit opt-out (``redact='off'``). + """ + banner.print_banner() + print("=== auto: running the full pipeline (no review pause) ===") + if redact == "off": + print("WARNING: --redact off — sensitive data will NOT be removed.") + rc = parse(**parse_kwargs) + if rc: + return rc + rc = audit(redact=redact, locales=parse_kwargs.get("locales")) + if rc: + return rc + if do_train: + rc = train(gpus=gpus) + return rc + + +# Ordered for the menu. +STEPS = [ + ("parse", "Parse export → conversations + sensitive-data scan", parse_done), + ("audit", "Review scan → redact → training-ready dataset", audit_done), + ("train", "LoRA fine-tune on the dataset", train_done), + ("merge", "Merge the LoRA adapter into the base model (optional)", lambda: False), + ("chat", "Chat with the fine-tuned model", lambda: False), +] + + +def load_env() -> None: + _load_dotenv() From 9e1347f5a79b1959c0fadb921110ea9290391a06 Mon Sep 17 00:00:00 2001 From: NotYuSheng Date: Fri, 26 Jun 2026 00:33:23 +0800 Subject: [PATCH 3/6] docs: split ingestion docs (shared pipeline + per-source); minor README/gitignore Separate the source-agnostic pipeline (docs/data-pipeline.md) from source-specific parsing (docs/sources/telegram.md), reflecting the adapter architecture so WhatsApp/Discord docs can slot in. Add README notes on the DataExport folder layout and the small default model. Ignore *.csv (review exports derived from chat data). Co-Authored-By: Claude Opus 4.8 --- .gitignore | 1 + README.md | 4 + docs/data-pipeline.md | 187 +++++++++++++++++++++++++++++++++++++++ docs/sources/telegram.md | 82 +++++++++++++++++ 4 files changed, 274 insertions(+) create mode 100644 docs/data-pipeline.md create mode 100644 docs/sources/telegram.md diff --git a/.gitignore b/.gitignore index f93c2fe..67cf7a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /venv *.json *.jsonl +*.csv /saves /LLaMA-Factory /merged diff --git a/README.md b/README.md index 17bbc15..d47d07d 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,8 @@ Doppelganger/ └── result.json ← place your export here ``` +> **Note:** Telegram exports unzip to a dated folder like `DataExport_2025-07-09/result.json`. Move (or copy) that `result.json` to `data/result.json` — `setup.sh` looks for it there. Alternatively, point `python -m ingest` at the file directly with `--input path/to/result.json`. + **2. Clone and run setup** The setup scripts create a virtual environment, install pinned dependencies (LLaMA-Factory **0.9.4**), create your `.env`, and process the export into `data/chat_sharegpt.json`. @@ -123,6 +125,8 @@ source venv/bin/activate llamafactory-cli train configs/train_lora.yaml ``` +> **Note:** The tracked `train_lora.yaml` defaults to the small **Qwen1.5-1.8B-Chat** so this step runs fast as an end-to-end smoke test. It's a real model but too small to convincingly mimic your writing — for real results, switch to a larger base (e.g. Qwen2.5-14B-Instruct) via a local override. See [Fine-Tune Your Model](#fine-tune-your-model-lora). + ## Usage `python -m ingest` turns a raw export into a training-ready dataset. Useful flags: diff --git a/docs/data-pipeline.md b/docs/data-pipeline.md new file mode 100644 index 0000000..9fe5d90 --- /dev/null +++ b/docs/data-pipeline.md @@ -0,0 +1,187 @@ +# The ingestion pipeline (source-agnostic) + +This document describes every transformation Doppelganger applies to turn a raw +chat export into a training-ready dataset. The pipeline is **source-agnostic**: +a per-platform *adapter* normalizes the export into a common message stream, and +**every stage after that is shared across all sources**. + +The only source-specific step is stage 1 (the adapter). For how a particular +platform's export is parsed, see the per-source docs: + +- [Telegram](sources/telegram.md) — supported today +- WhatsApp, Discord, … — planned; each drops in under [`docs/sources/`](sources/) + +Entry point: [`python -m ingest`](../ingest/__main__.py) → +[`ingest/cli.py:main`](../ingest/cli.py). End-to-end flow: + +``` +export (platform-specific) + │ (1) ADAPTER PARSE ingest/adapters/.py ← source-specific + ▼ +NormalizedMessage stream ← common interface; everything below is shared + │ (2) BUILD SAMPLES ingest/core.py + │ a. split by silence gap + │ b. stitch reply-linked splits + │ c. assemble + merge turns + ▼ +role/text conversation samples + │ (3) SENSITIVE-DATA SCAN ingest/redactor.py (+ optional LLM) + │ (4) REDACTION APPLY off | replace | drop + │ (5) LLM QUALITY AUDIT ingest/validator.py (optional) + │ (6) SHAREGPT FORMAT ingest/sharegpt.py + ▼ +data/chat_sharegpt.json → LLaMA-Factory SFT +``` + +--- + +## 1. Adapter parse — `ingest/adapters/.py` + +Each platform has one adapter that reads its native export and emits a common +**`NormalizedMessage`** stream ([`ingest/message.py`](../ingest/message.py)), +decoupling every downstream stage from any platform's specific schema. Whatever +the source, the adapter is responsible for: + +- **identifying which sender is you** (tagging each message `sender_is_self`), +- **filtering non-messages** (system/service events, empty entries), +- **producing plain text** for each message, and +- **preserving reply + sender metadata** (`reply_to_id`, `sender_id`) for the + sessionizing and group-chat stages below. + +Output fields: `chat_id, timestamp, sender_id, sender_is_self, text, +message_id, reply_to_id`. + +> Adding a new platform means writing **only** this adapter so it emits the same +> `NormalizedMessage` stream — stages 2–6 are unchanged. Document it under +> [`docs/sources/`](sources/). Telegram's specifics live in +> [sources/telegram.md](sources/telegram.md). + +## 2. Build conversation samples — `ingest/core.py:build_samples` + +Turns the flat message stream into multi-turn conversations. Three sub-steps: + +**a. Split by silence gap** (`_split_into_conversations`) +Messages in a chat are cut into separate conversations wherever there's a +silence longer than `--conversation-gap` (default **3600s / 1h**). A quiet hour +is treated as a topic boundary. + +**b. Stitch reply-linked splits** (`_merge_by_reply`) +A gap-split is undone when a later message *replies to* an earlier one (via the +adapter's `reply_to_id` metadata): those conversations are unioned back together +and re-sorted. This recovers slow threads that a pure time-gap would wrongly +split. (Sources without reply metadata simply skip this — it's a no-op.) + +**c. Assemble + merge turns** (`_assemble_turns`) +Each message becomes a turn with role `user` (other people) or `assistant` +(you). Consecutive messages from the **same role** within `--message-chain` +(default **30s**) are merged into one turn (people send several quick texts as +one "turn"). Conversations with only one side are dropped — you need both a +`user` and an `assistant` turn to train on. + +Group chats: by default the other side is collapsed into a single `user` +speaker. With **`--multi-speaker`**, each non-self sender keeps their identity +and their turns are labelled (`Bob: ...`); your own turns are never labelled. + +**Output:** `Sample = List[{"role": "user"|"assistant", "text": str}]`. + +## 3. Sensitive-data scan — `ingest/redactor.py` + +A **non-destructive** pass that finds (but does not remove) personal/secret data +so you can review it before training. See [privacy](#privacy-notes). + +- **Regex detectors** ([`ingest/redaction/`](../ingest/redaction/)): emails, + phone numbers, payment cards (Luhn-checked), IP/MAC, API keys/tokens, plus + pluggable country ID packs (`--redact-locales`, default `SG`). Universal + patterns always run. +- **Writes `data/redaction_report.json`** — every finding with `conversation`, + `turn`, `role`, `category`, `detector`, `severity`, masked `preview`, and the + raw `value`. A summary table is printed to the terminal. +- **Optional LLM redaction** (`--llm-redact`): an OpenAI-compatible model flags + context-dependent PII (names, secrets) that regex misses. **Local-first** — + it refuses a hosted API unless `--allow-cloud-redaction` is set, so chat text + never leaves your machine by default. +- Skip with `--skip-redact-scan` (or `--no-audit` to skip scan *and* validation). + +## 4. Apply redaction — `ingest/redactor.py:apply` + +Acts on the findings according to `--redact`: + +| Mode | Effect | +|------|--------| +| `off` *(default)* | Scan + report only. Nothing changed. | +| `replace` | Swap each detected span for a `[CATEGORY]` placeholder. Keeps every conversation; removes the secret. | +| `drop` | Remove any conversation containing a detection. Smaller, more conservative dataset. | + +`--redact` is honoured even if the scan was skipped, so the dataset can't +silently retain sensitive data you asked to remove. + +## 5. LLM quality audit — `ingest/validator.py` (optional) + +When enabled (`LLM_VALIDATE=true`, an OpenAI-compatible endpoint configured), +an LLM scores each conversation for coherence, quality, and human/assistant +pairing. It **drops weak samples** and can **split over-merged** ones into +cleaner conversations. Disable with `--skip-validation` or `--no-audit`. + +## 6. ShareGPT format — `ingest/sharegpt.py:to_sharegpt` + +Converts role/text samples into the exact ShareGPT shape LLaMA-Factory consumes +and writes `data/chat_sharegpt.json` (registered in +[`configs/dataset_info.json`](../configs/dataset_info.json)). Roles map +`user → human`, `assistant → gpt`. + +**Crucially, each conversation is coerced into the structure LLaMA-Factory's +converter requires** (`_coerce_alternating`): it must **start with `human`**, +**strictly alternate** `human/gpt`, and **end with `gpt`** (even number of +turns). Raw chats break these rules all the time, so the converter: + +- **merges consecutive same-speaker turns** (multi-speaker labels stay in the + text), so alternation holds; +- **drops a leading `gpt` turn** (the other person messaged first — very common); +- **drops a trailing `human` turn** (so the sample ends on a trainable response). + +Without this, LLaMA-Factory silently discards every non-conforming conversation +at train time (logging only `Invalid role tag` / `Invalid message count` +warnings) — on one real export that quietly cut **3,527 samples down to 997**. +With it, ~3,300 of those samples survive and the dataset's reported count matches +what actually trains. See issue +[#42](https://github.com/NotYuSheng/Doppelganger/issues/42). + +> Loss is masked to your (`gpt`) turns only during SFT (`train_on_prompt: false`), +> so `human` turns — including multi-speaker labels — condition the model but are +> never themselves generated. + +### Alternate output: JSONL + +`--format jsonl` writes the intermediate role/text samples +(`data/chat_dataset.jsonl`, one conversation per line) instead — useful for +inspection or custom downstream processing. It does **not** apply the ShareGPT +coercion. + +--- + +## Useful flags (quick reference) + +| Flag | Default | Stage | Purpose | +|------|---------|-------|---------| +| `--source` | `telegram` | 1 | Which adapter parses the export | +| `--input` | `./data/result.json` | 1 | Path to the raw export | +| `--self-name` | auto | 1 | Override "which sender is you" | +| `--conversation-gap` | `3600` | 2a | Silence (s) that starts a new conversation | +| `--message-chain` | `30` | 2c | Max gap (s) to merge same-sender messages into one turn | +| `--multi-speaker` | off | 2c | Keep + label individual senders in group chats | +| `--redact` | `off` | 4 | `off` / `replace` / `drop` | +| `--redact-locales` | `SG` | 3 | Country ID packs for the scan | +| `--llm-redact` | off | 3 | LLM-assisted PII detection (local-first) | +| `--skip-redact-scan` | off | 3 | Skip the sensitive-data scan | +| `--skip-validation` | off | 5 | Skip the LLM quality audit | +| `--no-audit` | off | 3+5 | Skip scan *and* validation | +| `--format` | `sharegpt` | 6 | `sharegpt` (training) or `jsonl` (intermediate) | + +## Privacy notes + +The scan is a **safety net, not a guarantee** — regex and LLM detection both +miss real cases and raise false positives. Before training or sharing anything: +review `data/redaction_report.json` yourself, get consent from others in group +chats, and treat the dataset, `redaction_report.json` (which contains raw +values), trained adapters, and merged checkpoints all as sensitive. They are +gitignored by default; keep them that way. diff --git a/docs/sources/telegram.md b/docs/sources/telegram.md new file mode 100644 index 0000000..21e1a60 --- /dev/null +++ b/docs/sources/telegram.md @@ -0,0 +1,82 @@ +# Source: Telegram + +How Doppelganger parses a **Telegram** export into the normalized message stream +that the [shared pipeline](../data-pipeline.md) consumes. This is the only stage +that knows anything Telegram-specific; everything downstream (sessionizing, +scanning, redaction, ShareGPT formatting) is source-agnostic. + +Adapter: [`ingest/adapters/telegram.py`](../../ingest/adapters/telegram.py). +Use it with `--source telegram` (the default). + +## Exporting your data + +In **Telegram Desktop**: `Settings > Advanced > Export Telegram Data`. Select +your chat(s), choose **JSON** format (not HTML), and export. + +The export unzips to a dated folder: + +``` +DataExport_2025-07-09/ +└── result.json ← this is the file the adapter reads +``` + +Point the pipeline at it one of two ways: + +```bash +# a) move/copy it to the default location +cp DataExport_2025-07-09/result.json data/result.json +python -m ingest --source telegram + +# b) or pass the path directly +python -m ingest --source telegram --input DataExport_2025-07-09/result.json +``` + +> `setup.sh` expects `data/result.json`; it will stop with a "not found" error +> until the file is there. + +## What the adapter does + +- **Detects who "you" are.** Read from the export's `personal_information` + (first + last name), or overridden with `--self-name "Your Name"`. If it can't + be determined, the adapter raises rather than guessing — pass `--self-name`. + Every message is tagged `sender_is_self` so the shared pipeline knows which + turns are yours (the ones the model learns to generate). +- **Filters non-messages.** `service` events (pins, joins, calls) and + empty/invalid entries are skipped. +- **Joins rich-text fragments.** Telegram stores formatted messages as a list of + entity objects (`text_entities`); the adapter concatenates them back into a + single plain-text string. +- **Reads reply + group metadata.** `reply_to_message_id` (used downstream to + stitch reply-linked conversations) and per-sender identity (used for + `--multi-speaker` group handling) are preserved. + +## Output + +A flat list of `NormalizedMessage` +([`ingest/message.py`](../../ingest/message.py)): + +```python +NormalizedMessage( + chat_id, # which chat the message belongs to + timestamp, # unix seconds + sender_id, # sender display name / id + sender_is_self, # True if this is you + text, # plain-text content + message_id, # for reply resolution + reply_to_id, # the message this replies to, if any +) +``` + +From here the [shared pipeline](../data-pipeline.md) takes over. + +## Telegram-relevant flags + +| Flag | Default | Purpose | +|------|---------|---------| +| `--source` | `telegram` | Selects this adapter | +| `--input` | `./data/result.json` | Path to the Telegram `result.json` | +| `--self-name` | auto | Override "which sender is you" when auto-detection fails or is wrong | +| `--multi-speaker` | off | In group chats, keep + label each non-self sender (`Bob: ...`) instead of collapsing the other side into one speaker | + +All other flags (`--conversation-gap`, `--message-chain`, `--redact`, …) belong +to the shared pipeline — see [data-pipeline.md](../data-pipeline.md). From 280ef835fc6708ab42617903fe0d10c91a575710 Mon Sep 17 00:00:00 2001 From: NotYuSheng Date: Fri, 26 Jun 2026 00:42:47 +0800 Subject: [PATCH 4/6] feat(runner): size-based epoch advisor for `train` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recommend num_train_epochs from dataset size + effective batch, targeting a ~300-step budget but capping epochs low (and warning) on small datasets — where hitting the budget would just memorise the same few chats. Printed in `train` and the `audit` next-step hint; `--epochs auto|N` overrides via a temp config. Co-Authored-By: Claude Opus 4.8 --- doppelganger/__main__.py | 5 +- doppelganger/steps.py | 111 +++++++++++++++++++++++++++++++++++++-- tests/test_runner.py | 55 +++++++++++++++++++ 3 files changed, 165 insertions(+), 6 deletions(-) create mode 100644 tests/test_runner.py diff --git a/doppelganger/__main__.py b/doppelganger/__main__.py index 401ee2a..a765214 100644 --- a/doppelganger/__main__.py +++ b/doppelganger/__main__.py @@ -61,6 +61,9 @@ def build_parser() -> argparse.ArgumentParser: t = sub.add_parser("train", help="LoRA fine-tune on the dataset.") t.add_argument("--config", default=None, help="Training config (default: configs/train_lora[.local].yaml).") t.add_argument("--gpus", default=None, help="CUDA_VISIBLE_DEVICES, e.g. '0' or '0,1,2,3'.") + t.add_argument("--epochs", default=None, + help="Override num_train_epochs: an integer, or 'auto' for the " + "size-based recommendation. Omit to use the config value.") m = sub.add_parser("merge", help="Merge the LoRA adapter into the base model.") m.add_argument("--config", default=None, help="Export config (default: configs/export_lora[.local].yaml).") @@ -88,7 +91,7 @@ def _dispatch(args: argparse.Namespace) -> int: return steps.audit(redact=args.redact, locales=_locales(args.redact_locales), validate=not args.skip_validation) if args.command == "train": - return steps.train(config=args.config, gpus=args.gpus) + return steps.train(config=args.config, gpus=args.gpus, epochs=args.epochs) if args.command == "merge": return steps.merge(config=args.config, gpus=args.gpus) if args.command == "chat": diff --git a/doppelganger/steps.py b/doppelganger/steps.py index 5317249..d83ab62 100644 --- a/doppelganger/steps.py +++ b/doppelganger/steps.py @@ -5,9 +5,12 @@ Every step returns a process-style exit code (0 = success). """ +import json +import math import os import subprocess -from typing import List, Optional +import tempfile +from typing import List, Optional, Tuple from ingest import banner, core, redactor, sharegpt from ingest.adapters import available_sources, get_adapter @@ -51,6 +54,81 @@ def adapter_dir() -> Optional[str]: return None +# ── Epoch advisor ───────────────────────────────────────────────────────────── +# Style-SFT wants roughly this many optimisation steps total — enough to learn +# the voice, not so many it memorises specific chats. +_TARGET_STEPS = 300 +_MAX_EPOCHS = 3 + + +def dataset_size() -> int: + """Number of conversations in the training dataset (0 if not built yet).""" + try: + with open(DATASET_PATH, encoding="utf-8") as f: + return len(json.load(f)) + except (OSError, ValueError): + return 0 + + +def effective_batch(cfg: dict, num_gpus: int = 1) -> int: + return (int(cfg.get("per_device_train_batch_size", 1)) + * int(cfg.get("gradient_accumulation_steps", 1)) + * max(1, num_gpus)) + + +def recommend_epochs(n_samples: int, eff_batch: int) -> Tuple[int, int, List[str]]: + """Recommend an epoch count from dataset size, plus warnings. + + Targets ~``_TARGET_STEPS`` optimisation steps, but caps epochs low: on small + datasets, hitting the step budget would just mean memorising the same few + chats, so we cap and warn rather than crank epochs. Returns + ``(epochs, steps_per_epoch, warnings)``. + """ + eff_batch = max(1, eff_batch) + steps_per_epoch = max(1, math.ceil(n_samples / eff_batch)) + epochs = max(1, round(_TARGET_STEPS / steps_per_epoch)) + epochs = min(epochs, _MAX_EPOCHS) + + warnings: List[str] = [] + if n_samples == 0: + warnings.append("no dataset yet — run `parse` + `audit` first.") + elif n_samples < 1000: + epochs = min(epochs, 2) + warnings.append( + f"small dataset ({n_samples} samples) — the model will memorise. " + "More chats helps far more than more epochs.") + elif n_samples >= 10000: + epochs = 1 + return epochs, steps_per_epoch, warnings + + +def _advise_epochs(cfg: dict, num_gpus: int) -> int: + """Print an epoch recommendation for the current dataset; return it.""" + n = dataset_size() + eff = effective_batch(cfg, num_gpus) + rec, spe, warnings = recommend_epochs(n, eff) + print(f"[advisor] Dataset: {n} samples · effective batch {eff} " + f"→ {spe} steps/epoch. Recommended: {rec} epoch(s).") + configured = cfg.get("num_train_epochs") + if configured is not None and abs(float(configured) - rec) >= 1: + print(f"[advisor] config sets {configured} epoch(s); recommended {rec}. " + f"Pass `--epochs auto` to use {rec}, or `--epochs N` to set your own.") + for w in warnings: + print(f"[advisor] ⚠️ {w}") + return rec + + +def _config_with_epochs(cfg_path: str, epochs: float) -> str: + """Write a temp copy of the config with ``num_train_epochs`` overridden.""" + cfg = _read_yaml(cfg_path) + cfg["num_train_epochs"] = float(epochs) + import yaml + fd, path = tempfile.mkstemp(prefix="train_override_", suffix=".yaml") + with os.fdopen(fd, "w", encoding="utf-8") as f: + yaml.safe_dump(cfg, f, sort_keys=False) + return path + + # ── Status (for the menu's checkmarks / out-of-order guards) ────────────────── def parse_done() -> bool: return os.path.exists(RAW_PATH) @@ -145,7 +223,12 @@ def audit( written = sharegpt.write_sharegpt(samples, DATASET_PATH) print(f"Wrote {written} ShareGPT samples to {DATASET_PATH}.") - print("Next: run `train`.") + try: + cfg = _read_yaml(train_config()) + rec, spe, _ = recommend_epochs(written, effective_batch(cfg)) + print(f"Next: run `train` (recommended ~{rec} epoch(s) for {written} samples).") + except OSError: + print("Next: run `train`.") return 0 @@ -162,12 +245,30 @@ def _llamafactory(args: List[str], gpus: Optional[str] = None) -> int: return 127 -def train(config: Optional[str] = None, gpus: Optional[str] = None) -> int: - """Stage 3: LoRA fine-tune via LLaMA-Factory.""" +def train(config: Optional[str] = None, gpus: Optional[str] = None, + epochs: Optional[str] = None) -> int: + """Stage 3: LoRA fine-tune via LLaMA-Factory. + + ``epochs`` may be ``None`` (use the config value), an integer string, or + ``"auto"`` (use the size-based recommendation). + """ if not audit_done(): print(f"warning: {DATASET_PATH} not found — run `parse` + `audit` (or `auto`) first.") return 1 - return _llamafactory(["train", config or train_config()], gpus=gpus) + + cfg_path = config or train_config() + cfg = _read_yaml(cfg_path) + num_gpus = len(gpus.split(",")) if gpus else 1 + recommended = _advise_epochs(cfg, num_gpus) + + if epochs == "auto": + cfg_path = _config_with_epochs(cfg_path, recommended) + print(f"[advisor] --epochs auto → training for {recommended} epoch(s).") + elif epochs is not None: + cfg_path = _config_with_epochs(cfg_path, float(epochs)) + print(f"[advisor] --epochs {epochs} → overriding config.") + + return _llamafactory(["train", cfg_path], gpus=gpus) def merge(config: Optional[str] = None, gpus: Optional[str] = None) -> int: diff --git a/tests/test_runner.py b/tests/test_runner.py new file mode 100644 index 0000000..241f6a5 --- /dev/null +++ b/tests/test_runner.py @@ -0,0 +1,55 @@ +"""Unit tests for the doppelganger runner's pure helpers (no GPU/network/IO). + + python -m unittest discover -s tests -t . +""" + +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from doppelganger import steps + + +class EffectiveBatchTest(unittest.TestCase): + def test_multiplies_batch_accum_gpus(self): + cfg = {"per_device_train_batch_size": 2, "gradient_accumulation_steps": 8} + self.assertEqual(steps.effective_batch(cfg, num_gpus=1), 16) + self.assertEqual(steps.effective_batch(cfg, num_gpus=4), 64) + + def test_defaults_when_missing(self): + self.assertEqual(steps.effective_batch({}, num_gpus=1), 1) + + +class RecommendEpochsTest(unittest.TestCase): + def test_healthy_midsize_dataset_one_epoch(self): + # ~3.3k samples, eff batch 16 -> ~208 steps/epoch -> 1 epoch. + epochs, spe, warnings = steps.recommend_epochs(3324, 16) + self.assertEqual(epochs, 1) + self.assertEqual(spe, 208) + self.assertEqual(warnings, []) + + def test_small_dataset_is_capped_and_warned(self): + epochs, _, warnings = steps.recommend_epochs(997, 16) + self.assertLessEqual(epochs, 2) # capped, not cranked to hit step budget + self.assertTrue(any("small dataset" in w for w in warnings)) + + def test_large_dataset_one_epoch(self): + epochs, _, warnings = steps.recommend_epochs(20000, 16) + self.assertEqual(epochs, 1) + self.assertEqual(warnings, []) + + def test_never_exceeds_cap(self): + for n in (1200, 1500, 2500, 5000): + epochs, _, _ = steps.recommend_epochs(n, 16) + self.assertGreaterEqual(epochs, 1) + self.assertLessEqual(epochs, steps._MAX_EPOCHS) + + def test_no_dataset_warns(self): + _, _, warnings = steps.recommend_epochs(0, 16) + self.assertTrue(warnings) + + +if __name__ == "__main__": + unittest.main() From 28e9b78752f35907d18f99916c665d22312975f1 Mon Sep 17 00:00:00 2001 From: NotYuSheng Date: Fri, 26 Jun 2026 00:46:50 +0800 Subject: [PATCH 5/6] fix/harden-runner-cli-error-handling Address PR review: - adapter_dir / chat: catch Exception (missing PyYAML, malformed config), not just OSError, so the menu and chat don't crash on a clean environment - parse: wrap adapter.parse in try/except for a clean error on malformed exports or self-name detection failure - train / merge / chat: fail fast with a clear message when the config is missing - chat: accept --config so it aligns with the config train ran on - menu: handle EOFError / KeyboardInterrupt as a clean exit (0 / 130) - docs: drop the raw `value` field from the redaction_report.json contract (it is not emitted by ingest/redactor.py on this branch) Co-Authored-By: Claude Opus 4.8 --- docs/data-pipeline.md | 4 ++-- doppelganger/__main__.py | 12 ++++++++++-- doppelganger/steps.py | 35 +++++++++++++++++++++++++++++------ 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/docs/data-pipeline.md b/docs/data-pipeline.md index 9fe5d90..f96025d 100644 --- a/docs/data-pipeline.md +++ b/docs/data-pipeline.md @@ -94,8 +94,8 @@ so you can review it before training. See [privacy](#privacy-notes). pluggable country ID packs (`--redact-locales`, default `SG`). Universal patterns always run. - **Writes `data/redaction_report.json`** — every finding with `conversation`, - `turn`, `role`, `category`, `detector`, `severity`, masked `preview`, and the - raw `value`. A summary table is printed to the terminal. + `turn`, `role`, `category`, `detector`, `severity`, and a masked `preview`. + A summary table is printed to the terminal. - **Optional LLM redaction** (`--llm-redact`): an OpenAI-compatible model flags context-dependent PII (names, secrets) that regex misses. **Local-first** — it refuses a hosted API unless `--allow-cloud-redaction` is set, so chat text diff --git a/doppelganger/__main__.py b/doppelganger/__main__.py index a765214..64ae952 100644 --- a/doppelganger/__main__.py +++ b/doppelganger/__main__.py @@ -70,6 +70,7 @@ def build_parser() -> argparse.ArgumentParser: m.add_argument("--gpus", default=None, help="CUDA_VISIBLE_DEVICES.") c = sub.add_parser("chat", help="Chat with the fine-tuned model.") + c.add_argument("--config", default=None, help="Training config (default: configs/train_lora[.local].yaml).") c.add_argument("--gpus", default=None, help="CUDA_VISIBLE_DEVICES, e.g. '0'.") au = sub.add_parser("auto", help="Run parse → audit end-to-end (optionally → train).") @@ -95,7 +96,7 @@ def _dispatch(args: argparse.Namespace) -> int: if args.command == "merge": return steps.merge(config=args.config, gpus=args.gpus) if args.command == "chat": - return steps.chat(gpus=args.gpus) + return steps.chat(config=args.config, gpus=args.gpus) if args.command == "auto": return steps.auto( do_train=args.do_train, redact=args.redact, gpus=args.gpus, @@ -115,7 +116,14 @@ def _menu() -> int: mark = "✓" if done() else " " print(f" [{mark}] {i}. {name:6} — {desc}") print(" a. auto — run parse → audit end-to-end") - choice = input("> ").strip().lower() + try: + choice = input("> ").strip().lower() + except EOFError: # non-interactive stdin + print() + return 0 + except KeyboardInterrupt: # Ctrl-C + print() + return 130 if choice in ("q", "quit", "exit", ""): return 0 diff --git a/doppelganger/steps.py b/doppelganger/steps.py index d83ab62..33f6357 100644 --- a/doppelganger/steps.py +++ b/doppelganger/steps.py @@ -50,7 +50,7 @@ def adapter_dir() -> Optional[str]: """Where training writes the LoRA adapter (``output_dir`` in the config).""" try: return _read_yaml(train_config()).get("output_dir") - except OSError: + except Exception: # missing file, malformed YAML, or PyYAML not installed return None @@ -169,7 +169,11 @@ def parse( os.makedirs(DATA_DIR, exist_ok=True) print(f"Loading {input_path} via '{source}' adapter...") - messages = get_adapter(source).parse(input_path, self_name=self_name) + try: + messages = get_adapter(source).parse(input_path, self_name=self_name) + except Exception as e: # malformed export, bad permissions, self-name detection failure + print(f"error: failed to parse export: {e}") + return 1 print("Building conversation samples...") samples = core.build_samples( @@ -257,6 +261,9 @@ def train(config: Optional[str] = None, gpus: Optional[str] = None, return 1 cfg_path = config or train_config() + if not os.path.exists(cfg_path): + print(f"error: training config not found: {cfg_path}") + return 1 cfg = _read_yaml(cfg_path) num_gpus = len(gpus.split(",")) if gpus else 1 recommended = _advise_epochs(cfg, num_gpus) @@ -273,12 +280,28 @@ def train(config: Optional[str] = None, gpus: Optional[str] = None, def merge(config: Optional[str] = None, gpus: Optional[str] = None) -> int: """Stage 4 (optional): merge the LoRA adapter into the base model.""" - return _llamafactory(["export", config or export_config()], gpus=gpus) + cfg_path = config or export_config() + if not os.path.exists(cfg_path): + print(f"error: export config not found: {cfg_path}") + return 1 + return _llamafactory(["export", cfg_path], gpus=gpus) + +def chat(config: Optional[str] = None, gpus: Optional[str] = None) -> int: + """Stage 5: chat with the fine-tuned model (base + adapter, nothing merged). -def chat(gpus: Optional[str] = None) -> int: - """Stage 5: chat with the fine-tuned model (base + adapter, nothing merged).""" - cfg = _read_yaml(train_config()) + Uses the same training ``config`` as :func:`train` so a custom ``--config`` + run points at the right adapter / model / template. + """ + cfg_path = config or train_config() + if not os.path.exists(cfg_path): + print(f"error: training config not found: {cfg_path}") + return 1 + try: + cfg = _read_yaml(cfg_path) + except Exception as e: # malformed YAML or PyYAML not installed + print(f"error: failed to read training config: {e}") + return 1 out = cfg.get("output_dir") if not out or not os.path.exists(os.path.join(out, "adapter_model.safetensors")): print(f"warning: no trained adapter at {out!r} — run `train` first.") From d484b80bc062c11c1936ba605c73042c96c1f005 Mon Sep 17 00:00:00 2001 From: NotYuSheng Date: Fri, 26 Jun 2026 00:53:30 +0800 Subject: [PATCH 6/6] fix/guard-config-parsing-and-temp-file-cleanup Address second PR review round: - effective_batch: coerce batch/accum defensively (missing, null, or non-int values fall back to 1 instead of raising) - audit next-step hint: catch Exception (malformed config / missing PyYAML), not just OSError, so the hint never crashes a successful audit - train: guard _read_yaml with try/except, and delete the temp override config in a finally block so --epochs runs don't leak temp files Co-Authored-By: Claude Opus 4.8 --- doppelganger/steps.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/doppelganger/steps.py b/doppelganger/steps.py index 33f6357..89a71fb 100644 --- a/doppelganger/steps.py +++ b/doppelganger/steps.py @@ -71,9 +71,12 @@ def dataset_size() -> int: def effective_batch(cfg: dict, num_gpus: int = 1) -> int: - return (int(cfg.get("per_device_train_batch_size", 1)) - * int(cfg.get("gradient_accumulation_steps", 1)) - * max(1, num_gpus)) + def _int(key: str) -> int: + try: + return int(cfg.get(key) or 1) + except (TypeError, ValueError): + return 1 + return _int("per_device_train_batch_size") * _int("gradient_accumulation_steps") * max(1, num_gpus) def recommend_epochs(n_samples: int, eff_batch: int) -> Tuple[int, int, List[str]]: @@ -231,7 +234,7 @@ def audit( cfg = _read_yaml(train_config()) rec, spe, _ = recommend_epochs(written, effective_batch(cfg)) print(f"Next: run `train` (recommended ~{rec} epoch(s) for {written} samples).") - except OSError: + except Exception: # missing/malformed config or PyYAML — the hint is optional print("Next: run `train`.") return 0 @@ -264,18 +267,31 @@ def train(config: Optional[str] = None, gpus: Optional[str] = None, if not os.path.exists(cfg_path): print(f"error: training config not found: {cfg_path}") return 1 - cfg = _read_yaml(cfg_path) + try: + cfg = _read_yaml(cfg_path) + except Exception as e: # malformed YAML or PyYAML not installed + print(f"error: failed to read training config: {e}") + return 1 num_gpus = len(gpus.split(",")) if gpus else 1 recommended = _advise_epochs(cfg, num_gpus) + # An overridden epoch count goes through a throwaway temp config we clean up. + temp_cfg = None if epochs == "auto": - cfg_path = _config_with_epochs(cfg_path, recommended) + cfg_path = temp_cfg = _config_with_epochs(cfg_path, recommended) print(f"[advisor] --epochs auto → training for {recommended} epoch(s).") elif epochs is not None: - cfg_path = _config_with_epochs(cfg_path, float(epochs)) + cfg_path = temp_cfg = _config_with_epochs(cfg_path, float(epochs)) print(f"[advisor] --epochs {epochs} → overriding config.") - return _llamafactory(["train", cfg_path], gpus=gpus) + try: + return _llamafactory(["train", cfg_path], gpus=gpus) + finally: + if temp_cfg and os.path.exists(temp_cfg): + try: + os.remove(temp_cfg) + except OSError: + pass def merge(config: Optional[str] = None, gpus: Optional[str] = None) -> int: