|
| 1 | +from typing import Any, Dict, Literal |
| 2 | + |
| 3 | +import dspy |
| 4 | +from dspy.clients.lm import LM |
| 5 | +from dspy.primitives import Module |
| 6 | +from dspy.teleprompt.gepa.gepa import GEPA |
| 7 | +from gepa.core.adapter import ProposalFn |
| 8 | +from gepa.proposer.reflective_mutation.base import ReflectionComponentSelector |
| 9 | + |
| 10 | +from eval_protocol.models import EPParameters, EvaluationRow |
| 11 | +from eval_protocol.pytest.types import TestFunction |
| 12 | +from eval_protocol.training.gepa_utils import REFLECTION_LM_CONFIGS |
| 13 | +from eval_protocol.training.utils import build_ep_parameters_from_test |
| 14 | + |
| 15 | + |
| 16 | +class GEPATrainer: |
| 17 | + """ |
| 18 | + High-level entrypoint for running GEPA-style training against an existing |
| 19 | + `@evaluation_test`-decorated function. |
| 20 | +
|
| 21 | + This class is intentionally minimal for now: |
| 22 | + - It captures `EPParameters` from the provided test function via |
| 23 | + `build_ep_parameters_from_test`. |
| 24 | + - It stores any GEPA-related configuration kwargs for future use. |
| 25 | + - The actual GEPA optimization loop is left as a TODO. |
| 26 | + """ |
| 27 | + |
| 28 | + def __init__(self, test_fn: TestFunction) -> None: |
| 29 | + """ |
| 30 | + Args: |
| 31 | + test_fn: The `@evaluation_test`-decorated function defining the eval. |
| 32 | + """ |
| 33 | + self.test_fn = test_fn |
| 34 | + self.ep_params: EPParameters = build_ep_parameters_from_test(test_fn) |
| 35 | + |
| 36 | + self.metric = ( |
| 37 | + test_fn # TODO: need to convert our ep test_fn to a GEPA metric. also need to inject the feedback text. |
| 38 | + ) |
| 39 | + |
| 40 | + self.program = ... # TODO: converting between a program (dspy.Module) and an @evaluation_test is a bit tricky. |
| 41 | + |
| 42 | + self.train_set, self.val_set, self.test_set = ( |
| 43 | + ..., |
| 44 | + ..., |
| 45 | + ..., |
| 46 | + ) # TODO: need to convert our input_dataset to a train set |
| 47 | + |
| 48 | + def train( |
| 49 | + self, |
| 50 | + auto: Literal["light", "medium", "heavy"] | None = None, |
| 51 | + max_full_evals: int | None = None, |
| 52 | + max_metric_calls: int | None = None, |
| 53 | + reflection_minibatch_size: int = 3, |
| 54 | + candidate_selection_strategy: Literal["pareto", "current_best"] = "pareto", |
| 55 | + reflection_lm: LM | None = None, |
| 56 | + skip_perfect_score: bool = True, |
| 57 | + add_format_failure_as_feedback: bool = False, |
| 58 | + instruction_proposer: ProposalFn | None = None, |
| 59 | + component_selector: ReflectionComponentSelector | str = "round_robin", |
| 60 | + use_merge: bool = True, |
| 61 | + max_merge_invocations: int | None = 5, |
| 62 | + num_threads: int | None = None, |
| 63 | + failure_score: float = 0.0, |
| 64 | + perfect_score: float = 1.0, |
| 65 | + log_dir: str | None = None, |
| 66 | + track_stats: bool = False, |
| 67 | + use_wandb: bool = False, |
| 68 | + wandb_api_key: str | None = None, |
| 69 | + wandb_init_kwargs: dict[str, Any] | None = None, |
| 70 | + track_best_outputs: bool = False, |
| 71 | + warn_on_score_mismatch: bool = True, |
| 72 | + enable_tool_optimization: bool = False, |
| 73 | + use_mlflow: bool = False, |
| 74 | + seed: int | None = 0, |
| 75 | + gepa_kwargs: dict | None = None, |
| 76 | + ) -> Module: |
| 77 | + """ |
| 78 | + Run GEPA to optimize over candidates. |
| 79 | + """ |
| 80 | + gepa_args: dict[str, Any] = { |
| 81 | + "auto": auto, |
| 82 | + "max_full_evals": max_full_evals, |
| 83 | + "max_metric_calls": max_metric_calls, |
| 84 | + "reflection_minibatch_size": reflection_minibatch_size, |
| 85 | + "candidate_selection_strategy": candidate_selection_strategy, |
| 86 | + "reflection_lm": reflection_lm, |
| 87 | + "skip_perfect_score": skip_perfect_score, |
| 88 | + "add_format_failure_as_feedback": add_format_failure_as_feedback, |
| 89 | + "instruction_proposer": instruction_proposer, |
| 90 | + "component_selector": component_selector, |
| 91 | + "use_merge": use_merge, |
| 92 | + "max_merge_invocations": max_merge_invocations, |
| 93 | + "num_threads": num_threads, |
| 94 | + "failure_score": failure_score, |
| 95 | + "perfect_score": perfect_score, |
| 96 | + "log_dir": log_dir, |
| 97 | + "track_stats": track_stats, |
| 98 | + "use_wandb": use_wandb, |
| 99 | + "wandb_api_key": wandb_api_key, |
| 100 | + "wandb_init_kwargs": wandb_init_kwargs, |
| 101 | + "track_best_outputs": track_best_outputs, |
| 102 | + "warn_on_score_mismatch": warn_on_score_mismatch, |
| 103 | + "enable_tool_optimization": enable_tool_optimization, |
| 104 | + "use_mlflow": use_mlflow, |
| 105 | + "seed": seed, |
| 106 | + } |
| 107 | + gepa_args.update(gepa_kwargs or {}) |
| 108 | + |
| 109 | + optimizer = GEPA( |
| 110 | + metric=self.metric, |
| 111 | + **gepa_args, |
| 112 | + ) |
| 113 | + |
| 114 | + optimized_program = optimizer.compile( |
| 115 | + self.program, |
| 116 | + trainset=self.train_set, |
| 117 | + valset=self.val_set, |
| 118 | + ) |
| 119 | + |
| 120 | + return optimized_program |
| 121 | + |
| 122 | + def evaluate(self, optimized_program: Module) -> list[EvaluationRow]: |
| 123 | + # convert back to EP |
| 124 | + |
| 125 | + # and then just run our evaluation_test function on the optimized program. |
| 126 | + |
| 127 | + # OR we can evaluate using dspy.Evaluate |
| 128 | + |
| 129 | + # evaluate = dspy.Evaluate( |
| 130 | + # devset=self.test_set, |
| 131 | + # metric=self.metric, |
| 132 | + # num_threads=32, |
| 133 | + # display_table=True, |
| 134 | + # display_progress=True |
| 135 | + # ) |
| 136 | + |
| 137 | + # return evaluate(self.optimized_program) |
| 138 | + ... |
0 commit comments