From 4c68d88aea0a3be8e88286ed6c796fd183a48527 Mon Sep 17 00:00:00 2001 From: Mohamed Bilel Besbes Date: Fri, 24 Apr 2026 23:19:45 -0400 Subject: [PATCH] Reducing python test execution time --- tests/perfalert/test_methods_alerts.py | 5 +- treeherder/perf/alerts.py | 123 +++++++++++++++---------- treeherder/perf/tasks.py | 5 + 3 files changed, 78 insertions(+), 55 deletions(-) diff --git a/tests/perfalert/test_methods_alerts.py b/tests/perfalert/test_methods_alerts.py index a6541543406..c46838ae9c0 100644 --- a/tests/perfalert/test_methods_alerts.py +++ b/tests/perfalert/test_methods_alerts.py @@ -4,7 +4,6 @@ from treeherder.model.models import Push from treeherder.perf.alerts import ( - build_cpd_methods, create_alert, detect_methods_changes, generate_new_test_alerts_in_series, @@ -572,9 +571,7 @@ def test_detection_tolerance_deduplication_guard_suppresses_nearby_duplicate_ale int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], [] ) revision_data[d.push_id].values.append(d.value) - analyzed_series = detect_methods_changes( - test_perf_signature, list(revision_data.values()), build_cpd_methods() - ) + analyzed_series = detect_methods_changes(test_perf_signature, list(revision_data.values())) _alerts_mod = sys.modules["treeherder.perf.alerts"] original = create_alert diff --git a/treeherder/perf/alerts.py b/treeherder/perf/alerts.py index 79e04e078e3..ab08cc214f2 100644 --- a/treeherder/perf/alerts.py +++ b/treeherder/perf/alerts.py @@ -299,6 +299,10 @@ def build_cpd_methods(): return methods +# Built once at import time; use this everywhere instead of calling build_cpd_methods() repeatedly. +CPD_METHODS = build_cpd_methods() + + def name_voting_strategy( voting_strategy, min_method_agreement, @@ -324,9 +328,9 @@ def name_voting_strategy( return voting_strategy_naming -def detect_methods_changes(signature, data, methods, replicates_enabled=False): +def detect_methods_changes(signature, data, replicates_enabled=False): analyzed_series = data - for method_impl in methods.values(): + for method_impl in CPD_METHODS.values(): analyzed_series = method_impl.detect_changes(analyzed_series, signature, replicates_enabled) return analyzed_series @@ -378,7 +382,7 @@ def get_methods_detecting_at_index(analyzed_series, index, detection_index_toler """ Get detection data for all methods within detection tolerance margin of the given index. """ - all_methods = build_cpd_methods().keys() + all_methods = CPD_METHODS.keys() methods_data = {} # Check within the detection tolerance @@ -451,14 +455,13 @@ def priority_voting_strategy(analyzed_series, min_method_agreement=3, detection_ # Track which indices we've already added detections for (to avoid duplicates # in both Phase 1 and the fallback equal voting strategy) alerted_indices = set() + last_alerted_index = -detection_index_tolerance - 1 # Phase 1: Student detections (no detection tolerance) for i in range(1, len(analyzed_series)): # This prevents duplicate alerts from being raised for the same underlying change event # since different detection methods may pinpoint it at slightly different indices. - if any( - abs(i - alerted_idx) <= detection_index_tolerance for alerted_idx in alerted_indices - ): + if i - last_alerted_index <= detection_index_tolerance: continue cur = analyzed_series[i] @@ -471,6 +474,7 @@ def priority_voting_strategy(analyzed_series, min_method_agreement=3, detection_ detections.append((i, prev_index, methods_data)) alerted_indices.add(i) + last_alerted_index = i # Phase 2: Fall back to equal voting strategy for indices not caught by Student # Student won't influence the vote here since change_detected["student"] @@ -500,13 +504,12 @@ def equal_voting_strategy( return [] alerted_indices = alerted_indices if alerted_indices is not None else set() + last_alerted_index = max(alerted_indices) if alerted_indices else -detection_index_tolerance - 1 detections = [] for i in range(1, len(analyzed_series)): # Skip if we've already created an alert near this index - if any( - abs(i - alerted_idx) <= detection_index_tolerance for alerted_idx in alerted_indices - ): + if i - last_alerted_index <= detection_index_tolerance: continue # Check how many methods detected a change within the detection tolerance @@ -526,6 +529,7 @@ def equal_voting_strategy( if weighted_index is not None: detections.append((weighted_index, prev_index, methods_detecting_data)) alerted_indices.add(weighted_index) + last_alerted_index = weighted_index return detections @@ -547,7 +551,7 @@ def create_alert( application=signature.application, ) - all_methods = build_cpd_methods().keys() + all_methods = CPD_METHODS.keys() confidences = {} for method in all_methods: @@ -645,13 +649,66 @@ def create_alert( ) +def prepare_series_for_detection(signature, replicates_enabled=REPLICATES): + """ + Fetches series data from the DB and runs all CPD detectors over it. + """ + max_alert_age = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE + series = PerformanceDatum.objects.filter(signature=signature, push_timestamp__gte=max_alert_age) + + datum_with_replicates = ( + PerformanceDatum.objects.filter( + signature=signature, + repository=signature.repository, + push_timestamp__gte=max_alert_age, + ) + .annotate( + has_replicate=Exists( + PerformanceDatumReplicate.objects.filter(performance_datum_id=OuterRef("pk")) + ) + ) + .filter(has_replicate=True) + ) + replicates = PerformanceDatumReplicate.objects.filter( + performance_datum_id__in=Subquery(datum_with_replicates.values("id")) + ).values_list("performance_datum_id", "value") + replicates_map: dict[int, list[float]] = {} + for datum_id, value in replicates: + replicates_map.setdefault(datum_id, []).append(value) + + revision_data = {} + for d in series: + if not revision_data.get(d.push_id): + revision_data[d.push_id] = RevisionDatumTest( + int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], [] + ) + revision_data[d.push_id].values.append(d.value) + revision_data[d.push_id].replicates.extend(replicates_map.get(d.id, [])) + + data = list(revision_data.values()) + return detect_methods_changes(signature, data, replicates_enabled=replicates_enabled) + + +def _trim_series_after(analyzed_series, after_ts): + """ + Return a new list containing only the entries whose push_timestamp falls + strictly after after_ts. The original list is never mutated, keeping it + safe to reuse across multiple voting-strategy calls. + """ + return [d for d in analyzed_series if datetime.utcfromtimestamp(d.push_timestamp) > after_ts] + + def generate_new_test_alerts_in_series( signature, voting_strategy=VOTING_STRATEGY, min_method_agreement=MIN_METHOD_AGREEMENT, detection_index_tolerance=DETECTION_INDEX_TOLERANCE, replicates_enabled=REPLICATES, + analyzed_series=None, ): + """ + Generate test alerts for a signature using the given voting strategy. + """ detection_method_name = name_voting_strategy( voting_strategy, min_method_agreement, @@ -663,10 +720,9 @@ def generate_new_test_alerts_in_series( # (2) the alerts max age # use whichever is newer with transaction.atomic(): - max_alert_age = alert_after_ts = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE - series = PerformanceDatum.objects.filter( - signature=signature, push_timestamp__gte=max_alert_age - ) + if analyzed_series is None: + analyzed_series = prepare_series_for_detection(signature, replicates_enabled) + alert_after_ts = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE latest_alert_timestamp = ( PerformanceAlertTesting.objects.filter( series_signature=signature, detection_method=detection_method_name @@ -677,48 +733,13 @@ def generate_new_test_alerts_in_series( ) if latest_alert_timestamp: latest_ts = latest_alert_timestamp[0] - series = series.filter(push_timestamp__gt=latest_ts) if latest_ts > alert_after_ts: alert_after_ts = latest_ts - - datum_with_replicates = ( - PerformanceDatum.objects.filter( - signature=signature, - repository=signature.repository, - push_timestamp__gte=alert_after_ts, - ) - .annotate( - has_replicate=Exists( - PerformanceDatumReplicate.objects.filter(performance_datum_id=OuterRef("pk")) - ) - ) - .filter(has_replicate=True) - ) - replicates = PerformanceDatumReplicate.objects.filter( - performance_datum_id__in=Subquery(datum_with_replicates.values("id")) - ).values_list("performance_datum_id", "value") - replicates_map: dict[int, list[float]] = {} - for datum_id, value in replicates: - replicates_map.setdefault(datum_id, []).append(value) - - revision_data = {} - for d in series: - if not revision_data.get(d.push_id): - revision_data[d.push_id] = RevisionDatumTest( - int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], [] - ) - revision_data[d.push_id].values.append(d.value) - revision_data[d.push_id].replicates.extend(replicates_map.get(d.id, [])) - - data = list(revision_data.values()) - methods = build_cpd_methods() - analyzed_series = detect_methods_changes( - signature, data, methods, replicates_enabled=replicates_enabled - ) + analyzed_series = _trim_series_after(analyzed_series, alert_after_ts) # Apply voting with configurable parameters # min_method_agreement: consensus threshold (absolute number: 3 means 3 methods must agree out of 6 total) - # detection_index_tolerance: tolerance for matching detections (±2 indices) + # detection_index_tolerance: tolerance for matching detections (±1 indices) vote( signature, analyzed_series, diff --git a/treeherder/perf/tasks.py b/treeherder/perf/tasks.py index d5d289ee853..30b99294d2d 100644 --- a/treeherder/perf/tasks.py +++ b/treeherder/perf/tasks.py @@ -7,6 +7,7 @@ from treeherder.perf.alerts import ( generate_new_alerts_in_series, generate_new_test_alerts_in_series, + prepare_series_for_detection, ) from treeherder.perf.models import PerformanceSignature from treeherder.workers.task import retryable_task @@ -20,12 +21,15 @@ def generate_alerts(signature_id): signature = PerformanceSignature.objects.get(id=signature_id) generate_new_alerts_in_series(signature) try: + analyzed_series = prepare_series_for_detection(signature, replicates_enabled=False) + generate_new_test_alerts_in_series( signature, voting_strategy="priority", min_method_agreement=3, detection_index_tolerance=1, replicates_enabled=False, + analyzed_series=analyzed_series, ) generate_new_test_alerts_in_series( signature, @@ -33,6 +37,7 @@ def generate_alerts(signature_id): min_method_agreement=3, detection_index_tolerance=1, replicates_enabled=False, + analyzed_series=analyzed_series, ) except Exception as e: logger.exception(