88import os
99import pickle
1010import shutil
11- from concurrent . futures import ThreadPoolExecutor , as_completed
11+ from collections import defaultdict
1212from pathlib import Path
1313
1414import asv
1515import pandas as pd
1616
1717from datasmith .benchmark .collection import BenchmarkCollection
18- from datasmith .docker .context import ContextRegistry
18+ from datasmith .docker .context import ContextRegistry , DockerContext , Task
1919from datasmith .docker .orchestrator import (
20- build_repo_sha_image ,
2120 get_docker_client ,
2221 orchestrate ,
2322)
24- from datasmith .docker . validation import BuildResult , Task
23+ from datasmith .execution . collect_commits_offline import find_parent_releases
2524from datasmith .logging_config import configure_logging
2625from datasmith .scrape .utils import _parse_commit_url
2726
28- # logger = configure_logging(level=logging.DEBUG, stream=open(Path(__file__).with_suffix(".log"), "w"))
29- logger = configure_logging (level = logging .DEBUG )
27+ logger = configure_logging (level = logging .DEBUG , stream = open (Path (__file__ ).with_suffix (".log" ), "w" )) # noqa: SIM115
28+ # logger = configure_logging(level=logging.DEBUG)
3029
3130
3231def parse_args () -> argparse .Namespace :
@@ -107,11 +106,13 @@ def process_inputs(args: argparse.Namespace) -> dict[tuple[str, str], set[tuple[
107106 else :
108107 all_states [(owner , repo )].add ((sha , 0.0 ))
109108 elif args .commits :
110- commits = pd .read_json (args .commits , lines = True )
109+ commits = (
110+ pd .read_json (args .commits , lines = True ) if args .commits .suffix == ".jsonl" else pd .read_parquet (args .commits )
111+ )
111112 all_states = {}
112113 for _ , row in commits .iterrows ():
113114 repo_name = row ["repo_name" ]
114- sha = row ["commit_sha " ]
115+ sha = row ["sha " ]
115116 has_asv = row .get ("has_asv" , True )
116117 if not has_asv :
117118 logger .debug (f"Skipping { repo_name } commit { sha } as it does not have ASV benchmarks." )
@@ -135,16 +136,30 @@ def main(args: argparse.Namespace) -> None:
135136 context_registry = ContextRegistry .load_from_file (path = args .context_registry )
136137
137138 # Prepare tasks
138- tasks : list [Task ] = []
139+ tasks : list [tuple [Task , DockerContext ]] = []
140+ repo_commit_pairs = defaultdict (list )
139141 for (owner , repo ), uniq in all_states .items ():
140142 limited = list (uniq )[: max (0 , args .limit_per_repo )] if args .limit_per_repo > 0 else list (uniq )
141143 for sha , date in limited :
142144 task = Task (owner , repo , sha , commit_date = date )
143145 if task in context_registry :
144- tasks .append (task )
146+ tasks .append ((task , context_registry .get (task )))
147+ repo_commit_pairs [f"{ owner } /{ repo } " ].append (task )
148+ # also add the parent commit.
145149 else :
146150 logger .debug (f"main: skipping { task } as not in context registry" )
147151
152+ # get all parent commits and add them as tasks as well.
153+ for repo_name , tsks in repo_commit_pairs .items ():
154+ owner , repo = repo_name .split ("/" )
155+ shas = [t .sha for t in tsks ]
156+ parent_commits = find_parent_releases (repo_name , shas , add_first = True , incl_datetime = True )
157+ for i , (parent_sha , date ) in enumerate (parent_commits ):
158+ parent_task = Task (owner = owner , repo = repo , sha = parent_sha , commit_date = date ) # pyright: ignore[reportArgumentType]
159+ # use the child context.
160+ ctx = context_registry .get (tsks [i ])
161+ tasks .append ((parent_task , ctx ))
162+
148163 max_concurrency = (
149164 args .max_concurrency if args .max_concurrency != - 1 else max (4 , math .floor (0.5 * (os .cpu_count () or 1 )))
150165 )
@@ -170,44 +185,44 @@ def main(args: argparse.Namespace) -> None:
170185 }
171186 logger .debug ("main: machine_defaults keys=%d" , len (machine_defaults ))
172187
173- builds : list [BuildResult ] = []
174- if args .max_concurrency < 1 :
175- for t in tasks :
176- build_res : BuildResult = build_repo_sha_image (
177- client = client ,
178- context_registry = context_registry ,
179- task = t ,
180- force = args .force_rebuild ,
181- )
182- builds .append (build_res )
183- else :
184- with ThreadPoolExecutor (max_workers = args .max_concurrency ) as pool :
185- futures = [
186- pool .submit (
187- build_repo_sha_image ,
188- client ,
189- context_registry ,
190- task ,
191- args .force_rebuild ,
192- )
193- for task in tasks
194- ]
195- for fut in as_completed (futures ):
196- builds .append (fut .result ())
197-
198- successful_builds = [b for b in builds if b .rc != 1 ]
199-
200- logger .info ("Running benchmarks for %d images" , len (successful_builds ))
201- logger .info ("Failed builds for %d images" , len (builds ) - len (successful_builds ))
202- for b in builds :
203- if b .rc == 1 :
204- logger .warning ("Build failed for %s" , b .image_name )
188+ # builds: list[BuildResult] = []
189+ # if args.max_concurrency < 1:
190+ # for t in tasks:
191+ # build_res: BuildResult = build_repo_sha_image(
192+ # client=client,
193+ # context_registry=context_registry,
194+ # task=t,
195+ # force=args.force_rebuild,
196+ # )
197+ # builds.append(build_res)
198+ # else:
199+ # with ThreadPoolExecutor(max_workers=args.max_concurrency) as pool:
200+ # futures = [
201+ # pool.submit(
202+ # build_repo_sha_image,
203+ # client,
204+ # context_registry,
205+ # task,
206+ # args.force_rebuild,
207+ # )
208+ # for task in tasks
209+ # ]
210+ # for fut in as_completed(futures):
211+ # builds.append(fut.result())
212+
213+ # successful_builds = [b for b in builds if b.rc != 1]
214+
215+ # logger.info("Running benchmarks for %d images", len(successful_builds))
216+ # logger.info("Failed builds for %d images", len(builds) - len(successful_builds))
217+ # for b in builds:
218+ # if b.rc == 1:
219+ # logger.warning("Build failed for %s", b.image_name)
205220
206221 machine_args : dict [str , str ] = asv .machine .Machine .get_defaults () # pyright: ignore[reportAttributeAccessIssue]
207222 machine_args ["num_cpu" ] = str (args .num_cores )
208- files_by_image : dict [str , dict [str , str ]] = asyncio .run (
223+ files_by_image : dict [Task , dict [str , str ]] = asyncio .run (
209224 orchestrate (
210- docker_image_names = [ b . image_name for b in successful_builds ] ,
225+ contexts = tasks ,
211226 asv_args = asv_args ,
212227 machine_args = machine_args ,
213228 max_concurrency = max_concurrency ,
@@ -217,7 +232,7 @@ def main(args: argparse.Namespace) -> None:
217232 )
218233 )
219234 # save the files by image as a pickle file.
220- with open (output_dir / "files_by_image.pkl " , "wb" ) as f :
235+ with open (output_dir / "files_by_image.json " , "wb" ) as f :
221236 pickle .dump (files_by_image , f )
222237
223238 # save the files by image as a JSON file
0 commit comments