From 76c6450a8e2bccb7fe4a143c94367e6b4a7f94d6 Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Mon, 16 Mar 2026 02:29:04 +0000 Subject: [PATCH 1/6] Batch DB operations and git calls in post-run pipeline Reduces per-file post-processing overhead from ~8.5ms to ~2.8ms by: - Batch git ls-files: single subprocess call instead of one per file in classify_all (files.py) - Batch artifact registration: bulk hash lookup and bulk insert instead of per-file ORM queries (artifact.py, job_recording.py) - Batch job edge creation: add_inputs_batch/add_outputs_batch with single flush instead of per-file insert+flush (job.py) - Batch hash retrieval: get_hashes_batch eliminates N+1 queries in get_inputs/get_outputs (artifact.py, job.py) Co-Authored-By: Claude Opus 4.6 (1M context) --- roar/db/repositories/artifact.py | 96 +++++++++++++++++++++++++++++++ roar/db/repositories/job.py | 28 ++++++++- roar/db/services/job_recording.py | 23 +++++--- roar/filters/files.py | 37 +++++++++--- 4 files changed, 167 insertions(+), 17 deletions(-) diff --git a/roar/db/repositories/artifact.py b/roar/db/repositories/artifact.py index 8b9f6992..239b5c24 100644 --- a/roar/db/repositories/artifact.py +++ b/roar/db/repositories/artifact.py @@ -106,6 +106,74 @@ def register( self._session.flush() return artifact_id, True + def register_batch(self, items: list[tuple[dict[str, str], int, str | None]]) -> list[str]: + """Register multiple artifacts at once. Returns list of artifact_ids. + + items = [(hashes_dict, size, path), ...] + """ + if not items: + return [] + + # Collect all digests for the primary algorithm to check for existing artifacts + all_digests: dict[str, int] = {} # digest -> index in items + primary_algo: str | None = None + for i, (hashes, size, path) in enumerate(items): + for algo, digest in hashes.items(): + if primary_algo is None: + primary_algo = algo + if algo == primary_algo: + all_digests[digest.lower()] = i + + # Bulk lookup existing artifacts by primary hash + existing_map: dict[str, str] = {} # digest -> artifact_id + if primary_algo and all_digests: + rows = self._session.execute( + select(ArtifactHash.digest, ArtifactHash.artifact_id).where( + ArtifactHash.algorithm == primary_algo, + ArtifactHash.digest.in_(list(all_digests.keys())), + ) + ).all() + for digest, artifact_id in rows: + existing_map[digest] = artifact_id + + # Process items + artifact_ids: list[str] = [] + new_artifacts: list[Artifact] = [] + new_hashes: list[ArtifactHash] = [] + for hashes, size, path in items: + primary_digest = hashes.get(primary_algo, "").lower() if primary_algo else "" + if primary_digest in existing_map: + artifact_ids.append(existing_map[primary_digest]) + else: + artifact_id = secrets.token_hex(16) + artifact_ids.append(artifact_id) + new_artifacts.append( + Artifact( + id=artifact_id, + size=size, + first_seen_at=time.time(), + first_seen_path=path, + ) + ) + for algo, digest in hashes.items(): + new_hashes.append( + ArtifactHash( + artifact_id=artifact_id, + algorithm=algo, + digest=digest.lower(), + ) + ) + existing_map[primary_digest] = artifact_id # prevent dupes within batch + + if new_artifacts: + self._session.add_all(new_artifacts) + if new_hashes: + self._session.add_all(new_hashes) + if new_artifacts or new_hashes: + self._session.flush() + + return artifact_ids + def get(self, artifact_id: str) -> dict[str, Any] | None: """ Get artifact by ID. @@ -148,6 +216,34 @@ def get_hashes(self, artifact_id: str) -> list[dict[str, Any]]: for h in hashes ] + def get_hashes_batch(self, artifact_ids: list[str]) -> dict[str, list[dict[str, Any]]]: + """ + Get hashes for multiple artifacts in a single query. + + Args: + artifact_ids: List of artifact UUIDs + + Returns: + Dict mapping artifact_id to list of hash dicts. + """ + if not artifact_ids: + return {} + + rows = ( + self._session.execute( + select(ArtifactHash).where(ArtifactHash.artifact_id.in_(artifact_ids)) + ) + .scalars() + .all() + ) + + result: dict[str, list[dict[str, Any]]] = {aid: [] for aid in artifact_ids} + for h in rows: + result.setdefault(h.artifact_id, []).append( + {"algorithm": h.algorithm, "digest": h.digest} + ) + return result + def get_by_hash(self, digest: str, algorithm: str | None = None) -> dict[str, Any] | None: """ Get artifact by hash digest. diff --git a/roar/db/repositories/job.py b/roar/db/repositories/job.py index 920aef9e..912d61f9 100644 --- a/roar/db/repositories/job.py +++ b/roar/db/repositories/job.py @@ -274,6 +274,22 @@ def add_output( self._session.add(job_output) self._session.flush() + def add_inputs_batch(self, job_id: int, items: list[tuple[str, str]]) -> None: + """Bulk-insert input records. items = [(artifact_id, path), ...]""" + if not items: + return + objects = [JobInput(job_id=job_id, artifact_id=aid, path=p) for aid, p in items] + self._session.add_all(objects) + self._session.flush() + + def add_outputs_batch(self, job_id: int, items: list[tuple[str, str]]) -> None: + """Bulk-insert output records. items = [(artifact_id, path), ...]""" + if not items: + return + objects = [JobOutput(job_id=job_id, artifact_id=aid, path=p) for aid, p in items] + self._session.add_all(objects) + self._session.flush() + def has_input_path(self, job_id: int, path: str) -> bool: """Check whether an input row already exists for a job/path pair.""" existing = self._session.execute( @@ -319,9 +335,13 @@ def get_inputs(self, job_id: int) -> list[dict[str, Any]]: ) rows = self._session.execute(query).all() + # Batch-fetch all hashes in one query + artifact_ids = list({row[1] for row in rows}) + all_hashes = self._artifact_repository.get_hashes_batch(artifact_ids) + results = [] for path, artifact_id, byte_ranges, size, first_seen_path, kind, component_count in rows: - hashes = self._artifact_repository.get_hashes(artifact_id) + hashes = all_hashes.get(artifact_id, []) results.append( { "path": path or first_seen_path, # Use artifact path as fallback @@ -363,9 +383,13 @@ def get_outputs(self, job_id: int) -> list[dict[str, Any]]: ) rows = self._session.execute(query).all() + # Batch-fetch all hashes in one query + artifact_ids = list({row[1] for row in rows}) + all_hashes = self._artifact_repository.get_hashes_batch(artifact_ids) + results = [] for path, artifact_id, byte_ranges, size, first_seen_path, kind, component_count in rows: - hashes = self._artifact_repository.get_hashes(artifact_id) + hashes = all_hashes.get(artifact_id, []) results.append( { "path": path or first_seen_path, # Use artifact path as fallback diff --git a/roar/db/services/job_recording.py b/roar/db/services/job_recording.py index 55376847..bd52c585 100644 --- a/roar/db/services/job_recording.py +++ b/roar/db/services/job_recording.py @@ -217,6 +217,9 @@ def _register_artifacts( is_input: bool, ) -> None: """Register artifacts and link them to the job.""" + # Build batch items, skipping paths that already have edges for this job + batch_items: list[tuple[dict[str, str], int, str | None]] = [] + valid_paths: list[str] = [] for path in file_paths: if is_input and self._job_repo.has_input_path(job_id, path): continue @@ -226,22 +229,28 @@ def _register_artifacts( path_hashes = hashes_by_path.get(path) if not path_hashes: continue - hashes = {algo: digest for algo in hash_algorithms if (digest := path_hashes.get(algo))} if not hashes: continue - try: size = os.path.getsize(path) except OSError: size = 0 + batch_items.append((hashes, size, path)) + valid_paths.append(path) - artifact_id, _ = self._artifact_repo.register(hashes, size, path) + if not batch_items: + return - if is_input: - self._job_repo.add_input(job_id, artifact_id, path) - else: - self._job_repo.add_output(job_id, artifact_id, path) + # Batch register artifacts + artifact_ids = self._artifact_repo.register_batch(batch_items) + + # Batch create edges + edges = list(zip(artifact_ids, valid_paths)) + if is_input: + self._job_repo.add_inputs_batch(job_id, edges) + else: + self._job_repo.add_outputs_batch(job_id, edges) @staticmethod def _unique_paths(paths: list[str]) -> list[str]: diff --git a/roar/filters/files.py b/roar/filters/files.py index 34173054..7296976a 100644 --- a/roar/filters/files.py +++ b/roar/filters/files.py @@ -187,7 +187,20 @@ def _build_package_file_map(self) -> tuple[dict, dict]: # file_to_pkg is intentionally empty; classify() uses path extraction. return {}, pkg_versions - def classify(self, path: str) -> tuple[str, str | None]: + def _get_git_tracked_files(self) -> set[str]: + """Run ``git ls-files`` once and return a set of repo-relative paths.""" + try: + output = subprocess.check_output( + ["git", "ls-files"], + cwd=str(self.repo_root), + text=True, + stderr=subprocess.DEVNULL, + ) + return set(output.splitlines()) + except (subprocess.CalledProcessError, FileNotFoundError): + return set() + + def classify(self, path: str, git_tracked: set[str] | None = None) -> tuple[str, str | None]: """ Classify a file into one of: - "repo": tracked in the git repo @@ -224,12 +237,18 @@ def classify(self, path: str) -> tuple[str, str | None]: else: try: rel = Path(path_str).relative_to(self.repo_root) - subprocess.check_output( - ["git", "ls-files", "--error-unmatch", str(rel)], - cwd=str(self.repo_root), - stderr=subprocess.DEVNULL, - ) - return ("repo", None) + if git_tracked is not None: + if str(rel) in git_tracked: + return ("repo", None) + else: + return ("unmanaged", None) + else: + subprocess.check_output( + ["git", "ls-files", "--error-unmatch", str(rel)], + cwd=str(self.repo_root), + stderr=subprocess.DEVNULL, + ) + return ("repo", None) except subprocess.CalledProcessError: # In repo but not tracked - could be generated file return ("unmanaged", None) @@ -360,10 +379,12 @@ def classify_all(self, paths: list[str]) -> dict: "skip": 0, } + git_tracked = self._get_git_tracked_files() + for path in paths: if not path: continue - classification, pkg_name = self.classify(path) + classification, pkg_name = self.classify(path, git_tracked=git_tracked) stats[classification] = stats.get(classification, 0) + 1 if classification == "repo": From 8b8db3847ae4e2ee761df9c2a60ef7a8c186b78b Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Mon, 16 Mar 2026 13:17:46 +0000 Subject: [PATCH 2/6] Batch remaining per-file DB queries in post-run artifact registration Replace per-file has_input_path/has_output_path calls with single IN-clause queries, fix redundant setdefault in get_hashes_batch, and document register_batch's reduced signature vs register(). Co-Authored-By: Claude Opus 4.6 (1M context) --- roar/db/repositories/artifact.py | 7 ++++++- roar/db/repositories/job.py | 18 ++++++++++++++++++ roar/db/services/job_recording.py | 12 ++++++++---- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/roar/db/repositories/artifact.py b/roar/db/repositories/artifact.py index 239b5c24..086ac5ae 100644 --- a/roar/db/repositories/artifact.py +++ b/roar/db/repositories/artifact.py @@ -110,6 +110,11 @@ def register_batch(self, items: list[tuple[dict[str, str], int, str | None]]) -> """Register multiple artifacts at once. Returns list of artifact_ids. items = [(hashes_dict, size, path), ...] + + Note: unlike ``register()``, this does not accept ``source_type``, + ``source_url``, or ``metadata``, and does not backfill missing hash + algorithms on existing artifacts. Intended for the post-run + registration hot path where those fields are not needed. """ if not items: return [] @@ -239,7 +244,7 @@ def get_hashes_batch(self, artifact_ids: list[str]) -> dict[str, list[dict[str, result: dict[str, list[dict[str, Any]]] = {aid: [] for aid in artifact_ids} for h in rows: - result.setdefault(h.artifact_id, []).append( + result[h.artifact_id].append( {"algorithm": h.algorithm, "digest": h.digest} ) return result diff --git a/roar/db/repositories/job.py b/roar/db/repositories/job.py index 912d61f9..855abb28 100644 --- a/roar/db/repositories/job.py +++ b/roar/db/repositories/job.py @@ -310,6 +310,24 @@ def has_output_path(self, job_id: int, path: str) -> bool: ).scalar_one_or_none() return existing is not None + def existing_input_paths(self, job_id: int, paths: list[str]) -> set[str]: + """Return the subset of *paths* that already have input rows for *job_id*.""" + if not paths: + return set() + rows = self._session.execute( + select(JobInput.path).where(JobInput.job_id == job_id, JobInput.path.in_(paths)) + ).scalars().all() + return set(rows) + + def existing_output_paths(self, job_id: int, paths: list[str]) -> set[str]: + """Return the subset of *paths* that already have output rows for *job_id*.""" + if not paths: + return set() + rows = self._session.execute( + select(JobOutput.path).where(JobOutput.job_id == job_id, JobOutput.path.in_(paths)) + ).scalars().all() + return set(rows) + def get_inputs(self, job_id: int) -> list[dict[str, Any]]: """ Get input artifacts for a job. diff --git a/roar/db/services/job_recording.py b/roar/db/services/job_recording.py index bd52c585..d67c1db1 100644 --- a/roar/db/services/job_recording.py +++ b/roar/db/services/job_recording.py @@ -217,13 +217,17 @@ def _register_artifacts( is_input: bool, ) -> None: """Register artifacts and link them to the job.""" - # Build batch items, skipping paths that already have edges for this job + # Batch-check which paths already have edges for this job + if is_input: + already_linked = self._job_repo.existing_input_paths(job_id, file_paths) + else: + already_linked = self._job_repo.existing_output_paths(job_id, file_paths) + + # Build batch items, skipping paths that already have edges batch_items: list[tuple[dict[str, str], int, str | None]] = [] valid_paths: list[str] = [] for path in file_paths: - if is_input and self._job_repo.has_input_path(job_id, path): - continue - if not is_input and self._job_repo.has_output_path(job_id, path): + if path in already_linked: continue path_hashes = hashes_by_path.get(path) From ac9fa05ff1bacfe0c018627e9b9da356ceaa5d86 Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Mon, 16 Mar 2026 14:51:06 +0000 Subject: [PATCH 3/6] Add ROAR_TIMING instrumentation for phase-level timing breakdown When ROAR_TIMING=1 is set, prints a JSON timing summary to stderr with tracer, post-run, provenance, and record phase durations. Co-Authored-By: Claude Opus 4.6 (1M context) --- roar/services/execution/coordinator.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/roar/services/execution/coordinator.py b/roar/services/execution/coordinator.py index 3beb4338..b9d190ce 100644 --- a/roar/services/execution/coordinator.py +++ b/roar/services/execution/coordinator.py @@ -193,6 +193,9 @@ def stop_proxy_if_running() -> list: from ...core.interfaces.run import RunResult from .provenance import ProvenanceService + emit_timing = os.environ.get("ROAR_TIMING") == "1" + t_postrun_start = time.perf_counter() + bootstrap(ctx.roar_dir) config = load_config(start_dir=ctx.repo_root) @@ -226,12 +229,14 @@ def stop_proxy_if_running() -> list: ) roar_dir = os.path.join(ctx.repo_root, ".roar") provenance_service = ProvenanceService(cache_dir=roar_dir) + t_prov_start = time.perf_counter() prov = provenance_service.collect( ctx.repo_root, tracer_result.tracer_log_path, inject_log, config, ) + t_prov_end = time.perf_counter() self.logger.debug( "Provenance collected: read_files=%d, written_files=%d", len(prov.get("data", {}).get("read_files", [])), @@ -245,6 +250,7 @@ def stop_proxy_if_running() -> list: # Record in database self.logger.debug("Recording job in database") + t_record_start = time.perf_counter() job_id, job_uid, read_file_info, written_file_info, stale_upstream, stale_downstream = ( self._record_job( ctx, @@ -256,6 +262,7 @@ def stop_proxy_if_running() -> list: run_job_uid=run_job_uid, ) ) + t_record_end = time.perf_counter() self.logger.debug( "Job recorded: id=%d, uid=%s, inputs=%d, outputs=%d", job_id, @@ -270,6 +277,24 @@ def stop_proxy_if_running() -> list: self.logger.debug("Cleaning up temporary log files") self._cleanup_logs(tracer_result.tracer_log_path, tracer_result.inject_log_path) + t_postrun_end = time.perf_counter() + + if emit_timing: + import json as _json + + n_inputs = len(prov.get("data", {}).get("read_files", [])) + n_outputs = len(prov.get("data", {}).get("written_files", [])) + timing = { + "roar_timing": True, + "tracer_ms": round(tracer_result.duration * 1000, 2), + "postrun_ms": round((t_postrun_end - t_postrun_start) * 1000, 2), + "provenance_ms": round((t_prov_end - t_prov_start) * 1000, 2), + "record_ms": round((t_record_end - t_record_start) * 1000, 2), + "n_inputs": n_inputs, + "n_outputs": n_outputs, + } + print(f"ROAR_TIMING:{_json.dumps(timing)}", file=sys.stderr, flush=True) + self.logger.debug( "RunCoordinator.execute completed: exit_code=%d, duration=%.2fs", tracer_result.exit_code, From 642290e7f3fa29d6ab59f507a8544232c5858f77 Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Mon, 16 Mar 2026 19:30:51 +0000 Subject: [PATCH 4/6] Fix ruff lint errors in batch methods Co-Authored-By: Claude Opus 4.6 (1M context) --- roar/db/repositories/artifact.py | 2 +- roar/db/services/job_recording.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/roar/db/repositories/artifact.py b/roar/db/repositories/artifact.py index 086ac5ae..89264c0f 100644 --- a/roar/db/repositories/artifact.py +++ b/roar/db/repositories/artifact.py @@ -122,7 +122,7 @@ def register_batch(self, items: list[tuple[dict[str, str], int, str | None]]) -> # Collect all digests for the primary algorithm to check for existing artifacts all_digests: dict[str, int] = {} # digest -> index in items primary_algo: str | None = None - for i, (hashes, size, path) in enumerate(items): + for i, (hashes, _size, _path) in enumerate(items): for algo, digest in hashes.items(): if primary_algo is None: primary_algo = algo diff --git a/roar/db/services/job_recording.py b/roar/db/services/job_recording.py index b302d83d..38679f4e 100644 --- a/roar/db/services/job_recording.py +++ b/roar/db/services/job_recording.py @@ -256,7 +256,7 @@ def _register_artifacts( artifact_ids = self._artifact_repo.register_batch(batch_items) # Batch create edges - edges = list(zip(artifact_ids, valid_paths)) + edges = list(zip(artifact_ids, valid_paths, strict=True)) if is_input: self._job_repo.add_inputs_batch(job_id, edges) else: From aff09fc58e8ceac1c31f87010519d11c62e77525 Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Mon, 16 Mar 2026 19:52:11 +0000 Subject: [PATCH 5/6] Format batch methods for CI lint Co-Authored-By: Claude Opus 4.6 (1M context) --- roar/db/repositories/artifact.py | 4 +--- roar/db/repositories/job.py | 20 ++++++++++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/roar/db/repositories/artifact.py b/roar/db/repositories/artifact.py index 89264c0f..67eb1288 100644 --- a/roar/db/repositories/artifact.py +++ b/roar/db/repositories/artifact.py @@ -244,9 +244,7 @@ def get_hashes_batch(self, artifact_ids: list[str]) -> dict[str, list[dict[str, result: dict[str, list[dict[str, Any]]] = {aid: [] for aid in artifact_ids} for h in rows: - result[h.artifact_id].append( - {"algorithm": h.algorithm, "digest": h.digest} - ) + result[h.artifact_id].append({"algorithm": h.algorithm, "digest": h.digest}) return result def get_by_hash(self, digest: str, algorithm: str | None = None) -> dict[str, Any] | None: diff --git a/roar/db/repositories/job.py b/roar/db/repositories/job.py index 7505149d..38cade65 100644 --- a/roar/db/repositories/job.py +++ b/roar/db/repositories/job.py @@ -320,18 +320,26 @@ def existing_input_paths(self, job_id: int, paths: list[str]) -> set[str]: """Return the subset of *paths* that already have input rows for *job_id*.""" if not paths: return set() - rows = self._session.execute( - select(JobInput.path).where(JobInput.job_id == job_id, JobInput.path.in_(paths)) - ).scalars().all() + rows = ( + self._session.execute( + select(JobInput.path).where(JobInput.job_id == job_id, JobInput.path.in_(paths)) + ) + .scalars() + .all() + ) return set(rows) def existing_output_paths(self, job_id: int, paths: list[str]) -> set[str]: """Return the subset of *paths* that already have output rows for *job_id*.""" if not paths: return set() - rows = self._session.execute( - select(JobOutput.path).where(JobOutput.job_id == job_id, JobOutput.path.in_(paths)) - ).scalars().all() + rows = ( + self._session.execute( + select(JobOutput.path).where(JobOutput.job_id == job_id, JobOutput.path.in_(paths)) + ) + .scalars() + .all() + ) return set(rows) def get_inputs(self, job_id: int) -> list[dict[str, Any]]: From 9d60a07d8e2d4896276597f9566411fae3be4d16 Mon Sep 17 00:00:00 2001 From: Chris Geyer Date: Tue, 17 Mar 2026 14:31:49 +0000 Subject: [PATCH 6/6] Add get_hashes_batch to ArtifactRepository protocol Co-Authored-By: Claude Opus 4.6 (1M context) --- roar/core/interfaces/repositories.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/roar/core/interfaces/repositories.py b/roar/core/interfaces/repositories.py index 9dec2c12..a7e53707 100644 --- a/roar/core/interfaces/repositories.py +++ b/roar/core/interfaces/repositories.py @@ -36,6 +36,10 @@ def get_hashes(self, artifact_id: str) -> list[dict[str, Any]]: """Get all hashes for an artifact.""" ... + def get_hashes_batch(self, artifact_ids: list[str]) -> dict[str, list[dict[str, Any]]]: + """Get hashes for multiple artifacts in a single query.""" + ... + def get_locations(self, artifact_id: str) -> list[dict[str, str]]: """Get all known locations for an artifact.""" ...