Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions ci/mkpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,16 @@ def get_hashes(arch: Arch) -> tuple[str, bool]:
return (hash(deps), check)

def fetch_hashes() -> None:
for arch in [Arch.AARCH64, Arch.X86_64]:
hash_check[arch] = get_hashes(arch)
# Resolve both architectures in parallel since they are independent
# and each involves expensive fingerprinting.
with ThreadPoolExecutor(max_workers=2) as pool:
futures = {
pool.submit(get_hashes, arch): arch
for arch in [Arch.AARCH64, Arch.X86_64]
}
for future in futures:
arch = futures[future]
hash_check[arch] = future.result()

trim_builds_prep_thread = threading.Thread(target=fetch_hashes)
trim_builds_prep_thread.start()
Expand Down Expand Up @@ -718,8 +726,25 @@ def trim_tests_pipeline(
files = future.result()
imported_files[path] = files

# Cache compositions loaded with munge_services=False to extract image
# names from their service configs. This avoids expensive fingerprinting
# and dependency resolution that munge_services=True triggers.
compositions: dict[str, Composition] = {}

def get_composition_image_deps(
name: str,
) -> list[mzbuild.ResolvedImage]:
"""Get the mzbuild image dependencies for a composition without
doing expensive fingerprinting/dependency resolution."""
if name not in compositions:
compositions[name] = Composition(repo, name, munge_services=False)
comp = compositions[name]
image_names = []
for _svc_name, config in comp.compose.get("services", {}).items():
if "mzbuild" in config:
image_names.append(config["mzbuild"])
return [deps[img_name] for img_name in image_names if img_name in deps]

def to_step(config: dict[str, Any]) -> PipelineStep | None:
if "wait" in config or "group" in config:
return None
Expand All @@ -740,9 +765,7 @@ def to_step(config: dict[str, Any]) -> PipelineStep | None:
for plugin_name, plugin_config in plugin.items():
if plugin_name == "./ci/plugins/mzcompose":
name = plugin_config["composition"]
if name not in compositions:
compositions[name] = Composition(repo, name)
for dep in compositions[name].dependencies:
for dep in get_composition_image_deps(name):
step.image_dependencies.add(dep)
composition_path = str(repo.compositions[name])
step.extra_inputs.add(composition_path)
Expand Down
55 changes: 55 additions & 0 deletions misc/python/materialize/cargo.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ class Crate:
examples: The names of all examples in the crate.
"""

_inputs_cache: set[str] | None

def __init__(self, root: Path, path: Path):
self.root = root
self._inputs_cache = None
with open(path / "Cargo.toml") as f:
config = toml.load(f)
self.name = config["package"]["name"]
Expand Down Expand Up @@ -116,6 +119,8 @@ def inputs(self) -> set[str]:
# † As a development convenience, we omit mzcompose configuration files
# within a crate. This is technically incorrect if someone writes
# `include!("mzcompose.py")`, but that seems like a crazy thing to do.
if self._inputs_cache is not None:
return self._inputs_cache
return git.expand_globs(
self.root,
f"{self.path}/**",
Expand Down Expand Up @@ -245,3 +250,53 @@ def visit(c: Crate) -> None:
for d in crate.path_dev_dependencies:
visit(self.crates[d])
return deps

def precompute_crate_inputs(self) -> None:
"""Pre-fetch all crate input files in a single batched git call.

This replaces ~118 individual pairs of git subprocess calls with
a single pair, then partitions the results by crate path in Python.
"""
from materialize import spawn

root = next(iter(self.all_crates.values())).root
# Use paths relative to root for git specs and partitioning, since
# git --relative outputs paths relative to cwd (root). Crate paths
# may be absolute when MZ_ROOT is an absolute path.
crate_rel_paths = sorted(
set(str(c.path.relative_to(root)) for c in self.all_crates.values())
)

specs = []
for p in crate_rel_paths:
specs.append(f"{p}/**")
specs.append(f":(exclude){p}/mzcompose")
specs.append(f":(exclude){p}/mzcompose.py")

empty_tree = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
diff_files = spawn.capture(
["git", "diff", "--name-only", "-z", "--relative", empty_tree, "--"]
+ specs,
cwd=root,
)
ls_files = spawn.capture(
["git", "ls-files", "--others", "--exclude-standard", "-z", "--"] + specs,
cwd=root,
)
all_files = set(
f for f in (diff_files + ls_files).split("\0") if f.strip() != ""
)

# Partition files by crate path (longest match first for nested crates)
crate_file_map: dict[str, set[str]] = {p: set() for p in crate_rel_paths}
sorted_paths = sorted(crate_rel_paths, key=len, reverse=True)
for f in all_files:
for cp in sorted_paths:
if f.startswith(cp + "/"):
crate_file_map[cp].add(f)
break

# Inject cached results into each Crate object
for crate in self.all_crates.values():
rel = str(crate.path.relative_to(root))
crate._inputs_cache = crate_file_map.get(rel, set())
11 changes: 7 additions & 4 deletions misc/python/materialize/cli/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ def main(argv: list[str]) -> None:
args.command.invoke(args)


def load_composition(args: argparse.Namespace) -> Composition:
def load_composition(
args: argparse.Namespace, munge_services: bool = True
) -> Composition:
"""Loads the composition specified by the command-line arguments."""
if not args.ignore_docker_version:
docker_local_version = Version.parse(
Expand Down Expand Up @@ -205,6 +207,7 @@ def load_composition(args: argparse.Namespace) -> Composition:
project_name=args.project_name,
sanity_restart_mz=args.sanity_restart_mz,
host_network=args.host_network,
munge_services=munge_services,
)
except UnknownCompositionError as e:
if args.find:
Expand Down Expand Up @@ -335,7 +338,7 @@ class ListWorkflowsCommand(Command):
help = "list workflows in the composition"

def run(self, args: argparse.Namespace) -> None:
composition = load_composition(args)
composition = load_composition(args, munge_services=False)
for name in sorted(composition.workflows):
print(name)

Expand All @@ -346,7 +349,7 @@ class DescribeCommand(Command):
help = "describe services and workflows in the composition"

def run(self, args: argparse.Namespace) -> None:
composition = load_composition(args)
composition = load_composition(args, munge_services=False)

workflows = []
for name, fn in composition.workflows.items():
Expand Down Expand Up @@ -387,7 +390,7 @@ class DescriptionCommand(Command):
help = "fetch the Python code description from mzcompose.py"

def run(self, args: argparse.Namespace) -> None:
composition = load_composition(args)
composition = load_composition(args, munge_services=False)
print(composition.description)


Expand Down
103 changes: 86 additions & 17 deletions misc/python/materialize/mzbuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ def rewrite_builder_path_for_host(self, path: Path) -> Path:
return path


def docker_images() -> set[str]:
@cache
def docker_images() -> frozenset[str]:
"""List the Docker images available on the local machine."""
return set(
return frozenset(
spawn.capture(["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"])
.strip()
.split("\n")
Expand Down Expand Up @@ -467,18 +468,25 @@ def inputs(self) -> set[str]:
class CargoPreImage(PreImage):
"""A `PreImage` action that uses Cargo."""

def inputs(self) -> set[str]:
inputs = {
"ci/builder",
"Cargo.toml",
# TODO(benesch): we could in theory fingerprint only the subset of
# Cargo.lock that applies to the crates at hand, but that is a
# *lot* of work.
"Cargo.lock",
".cargo/config",
}
@staticmethod
@cache
def _cargo_shared_inputs() -> frozenset[str]:
"""Resolve shared Cargo inputs once and cache the result.

return inputs
This expands the 'ci/builder' directory glob and filters out
non-existent files like '.cargo/config', avoiding repeated
git subprocess calls in fingerprint().
"""
inputs: set[str] = set()
inputs |= git.expand_globs(Path("."), "ci/builder/**")
inputs.add("Cargo.toml")
inputs.add("Cargo.lock")
if Path(".cargo/config").exists():
inputs.add(".cargo/config")
return frozenset(inputs)

def inputs(self) -> set[str]:
return set(CargoPreImage._cargo_shared_inputs())

def extra(self) -> str:
# Cargo images depend on the release mode and whether
Expand Down Expand Up @@ -750,9 +758,12 @@ class Image:

_DOCKERFILE_MZFROM_RE = re.compile(rb"^MZFROM\s*(\S+)")

_context_files_cache: set[str] | None

def __init__(self, rd: RepositoryDetails, path: Path):
self.rd = rd
self.path = path
self._context_files_cache = None
self.pre_images: list[PreImage] = []
with open(self.path / "mzbuild.yml") as f:
data = yaml.safe_load(f)
Expand Down Expand Up @@ -1066,7 +1077,10 @@ def inputs(self, transitive: bool = False) -> set[str]:
inputs: A list of input files, relative to the root of the
repository.
"""
paths = set(git.expand_globs(self.image.rd.root, f"{self.image.path}/**"))
if self.image._context_files_cache is not None:
paths = set(self.image._context_files_cache)
else:
paths = set(git.expand_globs(self.image.rd.root, f"{self.image.path}/**"))
if not paths:
# While we could find an `mzbuild.yml` file for this service, expland_globs didn't
# return any files that matched this service. At the very least, the `mzbuild.yml`
Expand Down Expand Up @@ -1094,9 +1108,15 @@ def fingerprint(self) -> Fingerprint:
inputs via `PreImage.inputs`.
"""
self_hash = hashlib.sha1()
for rel_path in sorted(
set(git.expand_globs(self.image.rd.root, *self.inputs()))
):
# When inputs come from precomputed sources (crate and image context
# batching + resolved CargoPreImage paths), they are already individual
# file paths from git. Skip the expensive expand_globs subprocess calls.
inputs = self.inputs()
if self.image._context_files_cache is not None:
resolved_inputs = sorted(inputs)
else:
resolved_inputs = sorted(set(git.expand_globs(self.image.rd.root, *inputs)))
for rel_path in resolved_inputs:
abs_path = self.image.rd.root / rel_path
file_hash = hashlib.sha1()
raw_file_mode = os.lstat(abs_path).st_mode
Expand Down Expand Up @@ -1486,6 +1506,13 @@ def resolve_dependencies(self, targets: Iterable[Image]) -> DependencySet:
ValueError: A circular dependency was discovered in the images
in the repository.
"""
# Pre-fetch all crate input files in a single batched git call,
# replacing ~118 individual subprocess pairs with one pair.
self.rd.cargo_workspace.precompute_crate_inputs()
# Pre-fetch all image context files in a single batched git call,
# replacing ~41 individual subprocess pairs with one pair.
self._precompute_image_context_files()

resolved = OrderedDict()
visiting = set()

Expand All @@ -1506,6 +1533,48 @@ def visit(image: Image, path: list[str] = []) -> None:

return DependencySet(resolved.values())

def _precompute_image_context_files(self) -> None:
"""Pre-fetch all image context files in a single batched git call.

This replaces ~41 individual pairs of git subprocess calls (one per
image) with a single pair, then partitions the results by image path.
"""
root = self.rd.root
# Use paths relative to root for git specs and partitioning, since
# git --relative outputs paths relative to cwd (root). Image paths
# may be absolute when MZ_ROOT is an absolute path.
image_rel_paths = sorted(
set(str(img.path.relative_to(root)) for img in self.images.values())
)
specs = [f"{p}/**" for p in image_rel_paths]

empty_tree = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"
diff_files = spawn.capture(
["git", "diff", "--name-only", "-z", "--relative", empty_tree, "--"]
+ specs,
cwd=root,
)
ls_files = spawn.capture(
["git", "ls-files", "--others", "--exclude-standard", "-z", "--"] + specs,
cwd=root,
)
all_files = set(
f for f in (diff_files + ls_files).split("\0") if f.strip() != ""
)

# Partition files by image path (longest match first for nested paths)
image_file_map: dict[str, set[str]] = {p: set() for p in image_rel_paths}
sorted_paths = sorted(image_rel_paths, key=len, reverse=True)
for f in all_files:
for ip in sorted_paths:
if f.startswith(ip + "/"):
image_file_map[ip].add(f)
break

for img in self.images.values():
rel = str(img.path.relative_to(root))
img._context_files_cache = image_file_map.get(rel, set())

def __iter__(self) -> Iterator[Image]:
return iter(self.images.values())

Expand Down