-
Notifications
You must be signed in to change notification settings - Fork 382
Reducing python test execution time #9466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -299,6 +299,10 @@ def build_cpd_methods(): | |
| return methods | ||
|
|
||
|
|
||
| # Built once at import time; use this everywhere instead of calling build_cpd_methods() repeatedly. | ||
| CPD_METHODS = build_cpd_methods() | ||
|
|
||
|
|
||
| def name_voting_strategy( | ||
| voting_strategy, | ||
| min_method_agreement, | ||
|
|
@@ -324,9 +328,9 @@ def name_voting_strategy( | |
| return voting_strategy_naming | ||
|
|
||
|
|
||
| def detect_methods_changes(signature, data, methods, replicates_enabled=False): | ||
| def detect_methods_changes(signature, data, replicates_enabled=False): | ||
| analyzed_series = data | ||
| for method_impl in methods.values(): | ||
| for method_impl in CPD_METHODS.values(): | ||
| analyzed_series = method_impl.detect_changes(analyzed_series, signature, replicates_enabled) | ||
| return analyzed_series | ||
|
|
||
|
|
@@ -378,7 +382,7 @@ def get_methods_detecting_at_index(analyzed_series, index, detection_index_toler | |
| """ | ||
| Get detection data for all methods within detection tolerance margin of the given index. | ||
| """ | ||
| all_methods = build_cpd_methods().keys() | ||
| all_methods = CPD_METHODS.keys() | ||
| methods_data = {} | ||
|
|
||
| # Check within the detection tolerance | ||
|
|
@@ -451,14 +455,13 @@ def priority_voting_strategy(analyzed_series, min_method_agreement=3, detection_ | |
| # Track which indices we've already added detections for (to avoid duplicates | ||
| # in both Phase 1 and the fallback equal voting strategy) | ||
| alerted_indices = set() | ||
| last_alerted_index = -detection_index_tolerance - 1 | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, why is there a negative sign now for detection_index_tolerance? This seems convoluted.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically the negative sign is just there so the very first data point doesn't get skipped cuz it sets the starting value far enough back that the proximity check never accidentally fires before any real alert has been created |
||
|
|
||
| # Phase 1: Student detections (no detection tolerance) | ||
| for i in range(1, len(analyzed_series)): | ||
| # This prevents duplicate alerts from being raised for the same underlying change event | ||
| # since different detection methods may pinpoint it at slightly different indices. | ||
| if any( | ||
| abs(i - alerted_idx) <= detection_index_tolerance for alerted_idx in alerted_indices | ||
| ): | ||
| if i - last_alerted_index <= detection_index_tolerance: | ||
| continue | ||
|
|
||
| cur = analyzed_series[i] | ||
|
|
@@ -471,6 +474,7 @@ def priority_voting_strategy(analyzed_series, min_method_agreement=3, detection_ | |
|
|
||
| detections.append((i, prev_index, methods_data)) | ||
| alerted_indices.add(i) | ||
| last_alerted_index = i | ||
|
|
||
| # Phase 2: Fall back to equal voting strategy for indices not caught by Student | ||
| # Student won't influence the vote here since change_detected["student"] | ||
|
|
@@ -500,13 +504,12 @@ def equal_voting_strategy( | |
| return [] | ||
|
|
||
| alerted_indices = alerted_indices if alerted_indices is not None else set() | ||
| last_alerted_index = max(alerted_indices) if alerted_indices else -detection_index_tolerance - 1 | ||
| detections = [] | ||
|
|
||
| for i in range(1, len(analyzed_series)): | ||
| # Skip if we've already created an alert near this index | ||
| if any( | ||
| abs(i - alerted_idx) <= detection_index_tolerance for alerted_idx in alerted_indices | ||
| ): | ||
| if i - last_alerted_index <= detection_index_tolerance: | ||
| continue | ||
|
|
||
| # Check how many methods detected a change within the detection tolerance | ||
|
|
@@ -526,6 +529,7 @@ def equal_voting_strategy( | |
| if weighted_index is not None: | ||
| detections.append((weighted_index, prev_index, methods_detecting_data)) | ||
| alerted_indices.add(weighted_index) | ||
| last_alerted_index = weighted_index | ||
|
|
||
| return detections | ||
|
|
||
|
|
@@ -547,7 +551,7 @@ def create_alert( | |
| application=signature.application, | ||
| ) | ||
|
|
||
| all_methods = build_cpd_methods().keys() | ||
| all_methods = CPD_METHODS.keys() | ||
| confidences = {} | ||
|
|
||
| for method in all_methods: | ||
|
|
@@ -645,13 +649,66 @@ def create_alert( | |
| ) | ||
|
|
||
|
|
||
| def prepare_series_for_detection(signature, replicates_enabled=REPLICATES): | ||
| """ | ||
| Fetches series data from the DB and runs all CPD detectors over it. | ||
| """ | ||
| max_alert_age = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE | ||
| series = PerformanceDatum.objects.filter(signature=signature, push_timestamp__gte=max_alert_age) | ||
|
|
||
| datum_with_replicates = ( | ||
| PerformanceDatum.objects.filter( | ||
| signature=signature, | ||
| repository=signature.repository, | ||
| push_timestamp__gte=max_alert_age, | ||
| ) | ||
| .annotate( | ||
| has_replicate=Exists( | ||
| PerformanceDatumReplicate.objects.filter(performance_datum_id=OuterRef("pk")) | ||
| ) | ||
| ) | ||
| .filter(has_replicate=True) | ||
| ) | ||
| replicates = PerformanceDatumReplicate.objects.filter( | ||
| performance_datum_id__in=Subquery(datum_with_replicates.values("id")) | ||
| ).values_list("performance_datum_id", "value") | ||
| replicates_map: dict[int, list[float]] = {} | ||
| for datum_id, value in replicates: | ||
| replicates_map.setdefault(datum_id, []).append(value) | ||
|
|
||
| revision_data = {} | ||
| for d in series: | ||
| if not revision_data.get(d.push_id): | ||
| revision_data[d.push_id] = RevisionDatumTest( | ||
| int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], [] | ||
| ) | ||
| revision_data[d.push_id].values.append(d.value) | ||
| revision_data[d.push_id].replicates.extend(replicates_map.get(d.id, [])) | ||
|
|
||
| data = list(revision_data.values()) | ||
| return detect_methods_changes(signature, data, replicates_enabled=replicates_enabled) | ||
|
|
||
|
|
||
| def _trim_series_after(analyzed_series, after_ts): | ||
| """ | ||
| Return a new list containing only the entries whose push_timestamp falls | ||
| strictly after after_ts. The original list is never mutated, keeping it | ||
| safe to reuse across multiple voting-strategy calls. | ||
| """ | ||
| return [d for d in analyzed_series if datetime.utcfromtimestamp(d.push_timestamp) > after_ts] | ||
|
|
||
|
|
||
| def generate_new_test_alerts_in_series( | ||
| signature, | ||
| voting_strategy=VOTING_STRATEGY, | ||
| min_method_agreement=MIN_METHOD_AGREEMENT, | ||
| detection_index_tolerance=DETECTION_INDEX_TOLERANCE, | ||
| replicates_enabled=REPLICATES, | ||
| analyzed_series=None, | ||
| ): | ||
| """ | ||
| Generate test alerts for a signature using the given voting strategy. | ||
| """ | ||
| detection_method_name = name_voting_strategy( | ||
| voting_strategy, | ||
| min_method_agreement, | ||
|
|
@@ -663,10 +720,9 @@ def generate_new_test_alerts_in_series( | |
| # (2) the alerts max age | ||
| # use whichever is newer | ||
| with transaction.atomic(): | ||
| max_alert_age = alert_after_ts = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE | ||
| series = PerformanceDatum.objects.filter( | ||
| signature=signature, push_timestamp__gte=max_alert_age | ||
| ) | ||
| if analyzed_series is None: | ||
| analyzed_series = prepare_series_for_detection(signature, replicates_enabled) | ||
| alert_after_ts = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE | ||
| latest_alert_timestamp = ( | ||
| PerformanceAlertTesting.objects.filter( | ||
| series_signature=signature, detection_method=detection_method_name | ||
|
|
@@ -677,48 +733,13 @@ def generate_new_test_alerts_in_series( | |
| ) | ||
| if latest_alert_timestamp: | ||
| latest_ts = latest_alert_timestamp[0] | ||
| series = series.filter(push_timestamp__gt=latest_ts) | ||
| if latest_ts > alert_after_ts: | ||
| alert_after_ts = latest_ts | ||
|
|
||
| datum_with_replicates = ( | ||
| PerformanceDatum.objects.filter( | ||
| signature=signature, | ||
| repository=signature.repository, | ||
| push_timestamp__gte=alert_after_ts, | ||
| ) | ||
| .annotate( | ||
| has_replicate=Exists( | ||
| PerformanceDatumReplicate.objects.filter(performance_datum_id=OuterRef("pk")) | ||
| ) | ||
| ) | ||
| .filter(has_replicate=True) | ||
| ) | ||
| replicates = PerformanceDatumReplicate.objects.filter( | ||
| performance_datum_id__in=Subquery(datum_with_replicates.values("id")) | ||
| ).values_list("performance_datum_id", "value") | ||
| replicates_map: dict[int, list[float]] = {} | ||
| for datum_id, value in replicates: | ||
| replicates_map.setdefault(datum_id, []).append(value) | ||
|
|
||
| revision_data = {} | ||
| for d in series: | ||
| if not revision_data.get(d.push_id): | ||
| revision_data[d.push_id] = RevisionDatumTest( | ||
| int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], [] | ||
| ) | ||
| revision_data[d.push_id].values.append(d.value) | ||
| revision_data[d.push_id].replicates.extend(replicates_map.get(d.id, [])) | ||
|
|
||
| data = list(revision_data.values()) | ||
| methods = build_cpd_methods() | ||
| analyzed_series = detect_methods_changes( | ||
| signature, data, methods, replicates_enabled=replicates_enabled | ||
| ) | ||
| analyzed_series = _trim_series_after(analyzed_series, alert_after_ts) | ||
|
|
||
| # Apply voting with configurable parameters | ||
| # min_method_agreement: consensus threshold (absolute number: 3 means 3 methods must agree out of 6 total) | ||
| # detection_index_tolerance: tolerance for matching detections (±2 indices) | ||
| # detection_index_tolerance: tolerance for matching detections (±1 indices) | ||
| vote( | ||
| signature, | ||
| analyzed_series, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this may have been added after my last review (I don't remember seeing it before at least). Just a note for future PRs, please try to keep the scope limited to make the reviews quicker and less error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing, I could still separate the new changes into a separate PR if you want.