Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions scratch/scripts/synthesize_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import argparse
import datetime
import json
import os
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
Expand Down Expand Up @@ -139,7 +138,8 @@ def main(args: argparse.Namespace) -> None:

logger.info("Building base image...")
base_tag = build_base_image(client, DockerContext())
os.environ["DOCKER_CACHE_FROM"] = base_tag
logger.debug("%s", base_tag)
# os.environ["DOCKER_CACHE_FROM"] = base_tag

# Prepare tasks
tasks = prepare_tasks(all_states, args.limit_per_repo, context_registry)
Expand All @@ -154,6 +154,7 @@ def main(args: argparse.Namespace) -> None:
k: str(v.replace(" ", "_").replace("'", "").replace('"', "")) for k, v in machine_defaults.items()
}
logger.debug("main: machine_defaults keys=%d", len(machine_defaults))
logger.info("main: Starting work on %d tasks[%d workers]", len(tasks), args.max_workers)

results: list[dict] = []
if args.max_workers < 1:
Expand Down
125 changes: 87 additions & 38 deletions src/datasmith/docker/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@
logger = get_logger("docker.context")


def _new_api_client(client: docker.DockerClient) -> docker.APIClient:
"""
Create a fresh low-level APIClient for each build to avoid connection
contention across threads and to align API versions with the daemon.
"""
try:
base_url = client.api.base_url # e.g., 'unix://var/run/docker.sock'
except Exception:
base_url = None

try:
api_version = client.version().get("ApiVersion", "auto")
except Exception:
api_version = "auto"

try:
return docker.APIClient(base_url=base_url, version=api_version)
except Exception:
return docker.APIClient(version="auto")


def build_base_image(client: docker.DockerClient, ctx: DockerContext) -> str:
base_key = hash(ctx)
base_tag = f"asv-base-rev-{base_key}:base"
Expand Down Expand Up @@ -135,6 +156,9 @@ class DockerContext:
base_building_data: str
building_data: str

# Cached, reproducible tar bytes per (probe: bool). Immutable => thread-safe reuse.
_context_tar_bytes: dict[bool, bytes]

def __init__(
self,
building_data: str | None = None,
Expand All @@ -160,36 +184,48 @@ def __init__(
self.base_building_data = base_building_data
self.building_data = building_data

self._context_tar_bytes = {}

@staticmethod
def add_bytes(tar: tarfile.TarFile, name: str, data: bytes, mode: int = 0o644) -> None:
info = tarfile.TarInfo(name=name)
info.size = len(data)
info.mode = mode
info.mtime = 0
info.mtime = 0 # stable for cache keys
info.uid = info.gid = 0
info.uname = info.gname = ""
tar.addfile(info, io.BytesIO(data))

def build_tarball_stream(self, probe: bool = False) -> io.BytesIO:
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode="w") as tar:
# Add Dockerfile
def _build_tarball_bytes(self, probe: bool = False) -> bytes:
"""
Build a reproducible tarball (stable mtimes/owners and deterministic order)
and return its raw bytes for fast reuse across parallel builds.
"""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w") as tar:
# Deterministic order
DockerContext.add_bytes(tar, "Dockerfile", self.dockerfile_data.encode("utf-8"))
# Add entrypoint.sh
DockerContext.add_bytes(tar, "entrypoint.sh", self.entrypoint_data.encode("utf-8"), mode=0o755)
# Add docker_build_env.sh
DockerContext.add_bytes(tar, "docker_build_env.sh", self.env_building_data.encode("utf-8"), mode=0o755)

# Add docker_build_base.sh
DockerContext.add_bytes(tar, "docker_build_base.sh", self.base_building_data.encode("utf-8"), mode=0o755)

if not probe:
# Add docker_build_pkg.sh
DockerContext.add_bytes(tar, "docker_build_pkg.sh", self.building_data.encode("utf-8"), mode=0o755)
buf.seek(0)
return buf.getvalue()

def _get_context_bytes(self, probe: bool = False) -> bytes:
"""
Return cached tar bytes for the requested probe flag, building once lazily.
"""
if probe not in self._context_tar_bytes:
self._context_tar_bytes[probe] = self._build_tarball_bytes(probe=probe)
return self._context_tar_bytes[probe]

# Reset the stream position to the beginning
tar_stream.seek(0)
return tar_stream
def build_tarball_stream(self, probe: bool = False) -> io.BytesIO:
"""
Backwards-compatible: return a new BytesIO over the cached tar bytes.
"""
return io.BytesIO(self._get_context_bytes(probe=probe))

def process_image_name(self, image_name: str) -> tuple[str, str]:
"""Split image name into (repo, target). Target is required."""
Expand Down Expand Up @@ -221,7 +257,6 @@ def build_container(
logger.info("Docker image '%s' found locally.", image_name)
except ImageNotFound:
logger.info("Docker image '%s' not found locally. Building new image.", image_name)
pass # Image doesn't exist or was removed, proceed to build

if not image_exists:
cache_from = None
Expand All @@ -230,25 +265,36 @@ def build_container(
build_args = {**build_args, "BASE_IMAGE": base_image}
cache_from = [base_image]

if len(build_args) == 0 and not probe:
raise RuntimeError(f"Docker image '{image_name}' not found and no REPO_URL provided for build.")

# Pretty log
if len(build_args):
build_args_str = " --build-arg ".join(f"{k}={v}" for k, v in build_args.items())
logger.info("$ docker build -t %s src/datasmith/docker/ --build-arg %s", image_name, build_args_str)
try:
client.images.build(
fileobj=self.build_tarball_stream(probe=probe),
custom_context=True,
tag=image_name,
buildargs={**build_args, "BUILDKIT_INLINE_CACHE": "1"},
target=target,
rm=True,
labels=run_labels,
network_mode=os.environ.get("DOCKER_NETWORK_MODE", None),
cache_from=cache_from,
)
except DockerException:
logger.exception("Failed to build Docker image '%s'", image_name)
logger.info("$ docker build -t %s . --build-arg %s", image_name, build_args_str)
else:
raise RuntimeError(f"Docker image '{image_name}' not found and no REPO_URL provided for build.")
logger.info("$ docker build -t %s .", image_name)

api = _new_api_client(client)
try:
stream = api.build(
fileobj=io.BytesIO(self._get_context_bytes(probe=probe)),
custom_context=True,
tag=image_name,
buildargs={**build_args, "BUILDKIT_INLINE_CACHE": "1"},
target=target,
rm=True,
labels=run_labels,
network_mode=os.environ.get("DOCKER_NETWORK_MODE", None),
cache_from=cache_from,
decode=True,
pull=False,
)
# Drain stream to ensure completion
for _ in stream:
pass
except DockerException:
logger.exception("Failed to build Docker image '%s'", image_name)

if not client.images.get(image_name):
raise RuntimeError(f"Image '{image_name}' failed to build and is not found.")
Expand All @@ -271,6 +317,10 @@ def build_container_streaming( # noqa: C901
SDK-only build with streamed logs, tail capture, and a wall-clock timeout.
Returns a BuildResult and does NOT raise for typical failures (so callers can
report immediately).

Changes vs previous version:
- Reuses a cached, reproducible tarball to avoid per-build tarring & cache drift.
- Uses a fresh low-level API client per call to avoid connection contention in ThreadPools.
"""
run_labels = run_labels if run_labels else {}
_, target = self.process_image_name(image_name)
Expand Down Expand Up @@ -299,8 +349,9 @@ def build_container_streaming( # noqa: C901
except ImageNotFound:
logger.info("Docker image '%s' not found locally. Building.", image_name)

# Streamed build via low-level API for better control
tar_stream = self.build_tarball_stream(probe=probe)
# Streamed build via fresh low-level API client
api = _new_api_client(client)
tar_bytes = self._get_context_bytes(probe=probe)
stdout_buf: deque[str] = deque(maxlen=2000) # chunk-tail buffers
stderr_buf: deque[str] = deque(maxlen=2000)

Expand All @@ -318,8 +369,8 @@ def build_container_streaming( # noqa: C901
logger.info("$ docker build -t %s .", image_name)

try:
stream = client.api.build(
fileobj=tar_stream,
stream = api.build(
fileobj=io.BytesIO(tar_bytes),
custom_context=True,
tag=image_name,
buildargs={**build_args, "BUILDKIT_INLINE_CACHE": "1"},
Expand Down Expand Up @@ -358,14 +409,12 @@ def build_container_streaming( # noqa: C901
if s:
stdout_buf.append(s)
if "status" in chunk and chunk.get("progressDetail"):
# Status lines (pulling base layers, etc.)—treat as stdout
s = str(chunk.get("status", ""))
if s:
stdout_buf.append(s + "\n")
if "error" in chunk or "errorDetail" in chunk:
error_seen = (chunk.get("error") or str(chunk.get("errorDetail", ""))).strip()
if error_seen:
# also track in stderr tail
stderr_buf.append(error_seen + "\n")
break
except APIError:
Expand Down Expand Up @@ -594,7 +643,7 @@ def get_similar(self, key: str | Task) -> list[tuple[Task, DockerContext]]: # n
1) exact match (if present) — returned Task uses the caller's tag
2) other SHAs for owner/repo — returned Tasks use the caller's tag
sorted by |commit_date diff| if available, else by SHA
3) base owner/repo — returned Task uses the caller's tag
3) base owner/repo — returned Tasks use the caller's tag
"""
user_task = self.parse_key(key) if isinstance(key, str) else key
canonical = self._canonicalize(user_task)
Expand Down
Loading