diff --git a/scratch/scripts/synthesize_contexts.py b/scratch/scripts/synthesize_contexts.py index 8ae2883..20d7dcd 100644 --- a/scratch/scripts/synthesize_contexts.py +++ b/scratch/scripts/synthesize_contexts.py @@ -3,7 +3,6 @@ import argparse import datetime import json -import os import random from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path @@ -139,7 +138,8 @@ def main(args: argparse.Namespace) -> None: logger.info("Building base image...") base_tag = build_base_image(client, DockerContext()) - os.environ["DOCKER_CACHE_FROM"] = base_tag + logger.debug("%s", base_tag) + # os.environ["DOCKER_CACHE_FROM"] = base_tag # Prepare tasks tasks = prepare_tasks(all_states, args.limit_per_repo, context_registry) @@ -154,6 +154,7 @@ def main(args: argparse.Namespace) -> None: k: str(v.replace(" ", "_").replace("'", "").replace('"', "")) for k, v in machine_defaults.items() } logger.debug("main: machine_defaults keys=%d", len(machine_defaults)) + logger.info("main: Starting work on %d tasks[%d workers]", len(tasks), args.max_workers) results: list[dict] = [] if args.max_workers < 1: diff --git a/src/datasmith/docker/context.py b/src/datasmith/docker/context.py index dac750f..d753362 100644 --- a/src/datasmith/docker/context.py +++ b/src/datasmith/docker/context.py @@ -24,6 +24,27 @@ logger = get_logger("docker.context") +def _new_api_client(client: docker.DockerClient) -> docker.APIClient: + """ + Create a fresh low-level APIClient for each build to avoid connection + contention across threads and to align API versions with the daemon. + """ + try: + base_url = client.api.base_url # e.g., 'unix://var/run/docker.sock' + except Exception: + base_url = None + + try: + api_version = client.version().get("ApiVersion", "auto") + except Exception: + api_version = "auto" + + try: + return docker.APIClient(base_url=base_url, version=api_version) + except Exception: + return docker.APIClient(version="auto") + + def build_base_image(client: docker.DockerClient, ctx: DockerContext) -> str: base_key = hash(ctx) base_tag = f"asv-base-rev-{base_key}:base" @@ -135,6 +156,9 @@ class DockerContext: base_building_data: str building_data: str + # Cached, reproducible tar bytes per (probe: bool). Immutable => thread-safe reuse. + _context_tar_bytes: dict[bool, bytes] + def __init__( self, building_data: str | None = None, @@ -160,36 +184,48 @@ def __init__( self.base_building_data = base_building_data self.building_data = building_data + self._context_tar_bytes = {} + @staticmethod def add_bytes(tar: tarfile.TarFile, name: str, data: bytes, mode: int = 0o644) -> None: info = tarfile.TarInfo(name=name) info.size = len(data) info.mode = mode - info.mtime = 0 + info.mtime = 0 # stable for cache keys info.uid = info.gid = 0 info.uname = info.gname = "" tar.addfile(info, io.BytesIO(data)) - def build_tarball_stream(self, probe: bool = False) -> io.BytesIO: - tar_stream = io.BytesIO() - with tarfile.open(fileobj=tar_stream, mode="w") as tar: - # Add Dockerfile + def _build_tarball_bytes(self, probe: bool = False) -> bytes: + """ + Build a reproducible tarball (stable mtimes/owners and deterministic order) + and return its raw bytes for fast reuse across parallel builds. + """ + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w") as tar: + # Deterministic order DockerContext.add_bytes(tar, "Dockerfile", self.dockerfile_data.encode("utf-8")) - # Add entrypoint.sh DockerContext.add_bytes(tar, "entrypoint.sh", self.entrypoint_data.encode("utf-8"), mode=0o755) - # Add docker_build_env.sh DockerContext.add_bytes(tar, "docker_build_env.sh", self.env_building_data.encode("utf-8"), mode=0o755) - - # Add docker_build_base.sh DockerContext.add_bytes(tar, "docker_build_base.sh", self.base_building_data.encode("utf-8"), mode=0o755) - if not probe: - # Add docker_build_pkg.sh DockerContext.add_bytes(tar, "docker_build_pkg.sh", self.building_data.encode("utf-8"), mode=0o755) + buf.seek(0) + return buf.getvalue() + + def _get_context_bytes(self, probe: bool = False) -> bytes: + """ + Return cached tar bytes for the requested probe flag, building once lazily. + """ + if probe not in self._context_tar_bytes: + self._context_tar_bytes[probe] = self._build_tarball_bytes(probe=probe) + return self._context_tar_bytes[probe] - # Reset the stream position to the beginning - tar_stream.seek(0) - return tar_stream + def build_tarball_stream(self, probe: bool = False) -> io.BytesIO: + """ + Backwards-compatible: return a new BytesIO over the cached tar bytes. + """ + return io.BytesIO(self._get_context_bytes(probe=probe)) def process_image_name(self, image_name: str) -> tuple[str, str]: """Split image name into (repo, target). Target is required.""" @@ -221,7 +257,6 @@ def build_container( logger.info("Docker image '%s' found locally.", image_name) except ImageNotFound: logger.info("Docker image '%s' not found locally. Building new image.", image_name) - pass # Image doesn't exist or was removed, proceed to build if not image_exists: cache_from = None @@ -230,25 +265,36 @@ def build_container( build_args = {**build_args, "BASE_IMAGE": base_image} cache_from = [base_image] + if len(build_args) == 0 and not probe: + raise RuntimeError(f"Docker image '{image_name}' not found and no REPO_URL provided for build.") + + # Pretty log if len(build_args): build_args_str = " --build-arg ".join(f"{k}={v}" for k, v in build_args.items()) - logger.info("$ docker build -t %s src/datasmith/docker/ --build-arg %s", image_name, build_args_str) - try: - client.images.build( - fileobj=self.build_tarball_stream(probe=probe), - custom_context=True, - tag=image_name, - buildargs={**build_args, "BUILDKIT_INLINE_CACHE": "1"}, - target=target, - rm=True, - labels=run_labels, - network_mode=os.environ.get("DOCKER_NETWORK_MODE", None), - cache_from=cache_from, - ) - except DockerException: - logger.exception("Failed to build Docker image '%s'", image_name) + logger.info("$ docker build -t %s . --build-arg %s", image_name, build_args_str) else: - raise RuntimeError(f"Docker image '{image_name}' not found and no REPO_URL provided for build.") + logger.info("$ docker build -t %s .", image_name) + + api = _new_api_client(client) + try: + stream = api.build( + fileobj=io.BytesIO(self._get_context_bytes(probe=probe)), + custom_context=True, + tag=image_name, + buildargs={**build_args, "BUILDKIT_INLINE_CACHE": "1"}, + target=target, + rm=True, + labels=run_labels, + network_mode=os.environ.get("DOCKER_NETWORK_MODE", None), + cache_from=cache_from, + decode=True, + pull=False, + ) + # Drain stream to ensure completion + for _ in stream: + pass + except DockerException: + logger.exception("Failed to build Docker image '%s'", image_name) if not client.images.get(image_name): raise RuntimeError(f"Image '{image_name}' failed to build and is not found.") @@ -271,6 +317,10 @@ def build_container_streaming( # noqa: C901 SDK-only build with streamed logs, tail capture, and a wall-clock timeout. Returns a BuildResult and does NOT raise for typical failures (so callers can report immediately). + + Changes vs previous version: + - Reuses a cached, reproducible tarball to avoid per-build tarring & cache drift. + - Uses a fresh low-level API client per call to avoid connection contention in ThreadPools. """ run_labels = run_labels if run_labels else {} _, target = self.process_image_name(image_name) @@ -299,8 +349,9 @@ def build_container_streaming( # noqa: C901 except ImageNotFound: logger.info("Docker image '%s' not found locally. Building.", image_name) - # Streamed build via low-level API for better control - tar_stream = self.build_tarball_stream(probe=probe) + # Streamed build via fresh low-level API client + api = _new_api_client(client) + tar_bytes = self._get_context_bytes(probe=probe) stdout_buf: deque[str] = deque(maxlen=2000) # chunk-tail buffers stderr_buf: deque[str] = deque(maxlen=2000) @@ -318,8 +369,8 @@ def build_container_streaming( # noqa: C901 logger.info("$ docker build -t %s .", image_name) try: - stream = client.api.build( - fileobj=tar_stream, + stream = api.build( + fileobj=io.BytesIO(tar_bytes), custom_context=True, tag=image_name, buildargs={**build_args, "BUILDKIT_INLINE_CACHE": "1"}, @@ -358,14 +409,12 @@ def build_container_streaming( # noqa: C901 if s: stdout_buf.append(s) if "status" in chunk and chunk.get("progressDetail"): - # Status lines (pulling base layers, etc.)—treat as stdout s = str(chunk.get("status", "")) if s: stdout_buf.append(s + "\n") if "error" in chunk or "errorDetail" in chunk: error_seen = (chunk.get("error") or str(chunk.get("errorDetail", ""))).strip() if error_seen: - # also track in stderr tail stderr_buf.append(error_seen + "\n") break except APIError: @@ -594,7 +643,7 @@ def get_similar(self, key: str | Task) -> list[tuple[Task, DockerContext]]: # n 1) exact match (if present) — returned Task uses the caller's tag 2) other SHAs for owner/repo — returned Tasks use the caller's tag sorted by |commit_date diff| if available, else by SHA - 3) base owner/repo — returned Task uses the caller's tag + 3) base owner/repo — returned Tasks use the caller's tag """ user_task = self.parse_key(key) if isinstance(key, str) else key canonical = self._canonicalize(user_task)