Skip to content

Commit 91b30c5

Browse files
committed
feat: enhance transfer process to include profiling artifacts and handle empty uploads
1 parent 4b16788 commit 91b30c5

2 files changed

Lines changed: 32 additions & 17 deletions

File tree

transfers/profiling.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,12 @@ def run(
8787
def upload_profile_artifacts(artifacts: Iterable[ProfileArtifact]) -> None:
8888
"""Upload generated profiling artifacts to the configured storage bucket."""
8989

90-
artifacts = list(artifacts)
9190
if not artifacts:
91+
logger.info("No profiling artifacts to upload")
9292
return
9393

94+
artifacts = list(artifacts)
95+
9496
bucket = get_storage_bucket()
9597
for artifact in artifacts:
9698
for path in (artifact.stats_path, artifact.report_path):

transfers/transfer.py

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True):
296296
transfer_minor_trace_chemistry,
297297
transfer_nma_stratigraphy,
298298
transfer_associated_data,
299+
profile_waterlevels,
300+
profile_artifacts,
299301
)
300302
else:
301303
_transfer_sequential(
@@ -329,6 +331,8 @@ def transfer_all(metrics, limit=100, profile_waterlevels: bool = True):
329331
profile_artifacts,
330332
)
331333

334+
return profile_artifacts
335+
332336

333337
def _transfer_parallel(
334338
metrics,
@@ -357,6 +361,8 @@ def _transfer_parallel(
357361
transfer_minor_trace_chemistry,
358362
transfer_nma_stratigraphy,
359363
transfer_associated_data,
364+
profile_waterlevels,
365+
profile_artifacts,
360366
):
361367
"""Execute transfers in parallel where possible."""
362368
message("PARALLEL TRANSFER GROUP 1")
@@ -548,24 +554,31 @@ def _transfer_parallel(
548554
("Acoustic", WaterLevelsContinuousAcousticTransferer, flags)
549555
)
550556

551-
with ThreadPoolExecutor(max_workers=2) as executor:
552-
futures = {}
557+
if profile_waterlevels:
553558
for name, klass, task_flags in parallel_tasks_2:
554-
future = executor.submit(
555-
_execute_transfer_with_timing, name, klass, task_flags
556-
)
557-
futures[future] = name
558-
559-
for future in as_completed(futures):
560-
name = futures[future]
561-
try:
562-
result_name, result, elapsed = future.result()
563-
results_map[result_name] = result
564-
logger.info(
565-
f"Parallel task {result_name} completed in {elapsed:.2f}s"
559+
profiler = TransferProfiler(f"waterlevels_continuous_{name.lower()}")
560+
results, artifact = profiler.run(_execute_transfer, klass, task_flags)
561+
profile_artifacts.append(artifact)
562+
results_map[name] = results
563+
else:
564+
with ThreadPoolExecutor(max_workers=2) as executor:
565+
futures = {}
566+
for name, klass, task_flags in parallel_tasks_2:
567+
future = executor.submit(
568+
_execute_transfer_with_timing, name, klass, task_flags
566569
)
567-
except Exception as e:
568-
logger.critical(f"Parallel task {name} failed: {e}")
570+
futures[future] = name
571+
572+
for future in as_completed(futures):
573+
name = futures[future]
574+
try:
575+
result_name, result, elapsed = future.result()
576+
results_map[result_name] = result
577+
logger.info(
578+
f"Parallel task {result_name} completed in {elapsed:.2f}s"
579+
)
580+
except Exception as e:
581+
logger.critical(f"Parallel task {name} failed: {e}")
569582

570583
if "Pressure" in results_map and results_map["Pressure"]:
571584
metrics.pressure_metrics(*results_map["Pressure"])

0 commit comments

Comments
 (0)