Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions tests/perfalert/test_methods_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from treeherder.model.models import Push
from treeherder.perf.alerts import (
build_cpd_methods,
create_alert,
detect_methods_changes,
generate_new_test_alerts_in_series,
Expand Down Expand Up @@ -572,9 +571,7 @@ def test_detection_tolerance_deduplication_guard_suppresses_nearby_duplicate_ale
int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], []
)
revision_data[d.push_id].values.append(d.value)
analyzed_series = detect_methods_changes(
test_perf_signature, list(revision_data.values()), build_cpd_methods()
)
analyzed_series = detect_methods_changes(test_perf_signature, list(revision_data.values()))

_alerts_mod = sys.modules["treeherder.perf.alerts"]
original = create_alert
Expand Down
123 changes: 72 additions & 51 deletions treeherder/perf/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ def build_cpd_methods():
return methods


# Built once at import time; use this everywhere instead of calling build_cpd_methods() repeatedly.
CPD_METHODS = build_cpd_methods()


def name_voting_strategy(
voting_strategy,
min_method_agreement,
Expand All @@ -324,9 +328,9 @@ def name_voting_strategy(
return voting_strategy_naming


def detect_methods_changes(signature, data, methods, replicates_enabled=False):
def detect_methods_changes(signature, data, replicates_enabled=False):
analyzed_series = data
for method_impl in methods.values():
for method_impl in CPD_METHODS.values():
analyzed_series = method_impl.detect_changes(analyzed_series, signature, replicates_enabled)
return analyzed_series

Expand Down Expand Up @@ -378,7 +382,7 @@ def get_methods_detecting_at_index(analyzed_series, index, detection_index_toler
"""
Get detection data for all methods within detection tolerance margin of the given index.
"""
all_methods = build_cpd_methods().keys()
all_methods = CPD_METHODS.keys()
methods_data = {}

# Check within the detection tolerance
Expand Down Expand Up @@ -451,14 +455,13 @@ def priority_voting_strategy(analyzed_series, min_method_agreement=3, detection_
# Track which indices we've already added detections for (to avoid duplicates
# in both Phase 1 and the fallback equal voting strategy)
alerted_indices = set()
last_alerted_index = -detection_index_tolerance - 1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this may have been added after my last review (I don't remember seeing it before at least). Just a note for future PRs, please try to keep the scope limited to make the reviews quicker and less error-prone.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, I could still separate the new changes into a separate PR if you want.

Copy link
Copy Markdown
Collaborator

@gmierz gmierz Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why is there a negative sign now for detection_index_tolerance? This seems convoluted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically the negative sign is just there so the very first data point doesn't get skipped cuz it sets the starting value far enough back that the proximity check never accidentally fires before any real alert has been created


# Phase 1: Student detections (no detection tolerance)
for i in range(1, len(analyzed_series)):
# This prevents duplicate alerts from being raised for the same underlying change event
# since different detection methods may pinpoint it at slightly different indices.
if any(
abs(i - alerted_idx) <= detection_index_tolerance for alerted_idx in alerted_indices
):
if i - last_alerted_index <= detection_index_tolerance:
continue

cur = analyzed_series[i]
Expand All @@ -471,6 +474,7 @@ def priority_voting_strategy(analyzed_series, min_method_agreement=3, detection_

detections.append((i, prev_index, methods_data))
alerted_indices.add(i)
last_alerted_index = i

# Phase 2: Fall back to equal voting strategy for indices not caught by Student
# Student won't influence the vote here since change_detected["student"]
Expand Down Expand Up @@ -500,13 +504,12 @@ def equal_voting_strategy(
return []

alerted_indices = alerted_indices if alerted_indices is not None else set()
last_alerted_index = max(alerted_indices) if alerted_indices else -detection_index_tolerance - 1
detections = []

for i in range(1, len(analyzed_series)):
# Skip if we've already created an alert near this index
if any(
abs(i - alerted_idx) <= detection_index_tolerance for alerted_idx in alerted_indices
):
if i - last_alerted_index <= detection_index_tolerance:
continue

# Check how many methods detected a change within the detection tolerance
Expand All @@ -526,6 +529,7 @@ def equal_voting_strategy(
if weighted_index is not None:
detections.append((weighted_index, prev_index, methods_detecting_data))
alerted_indices.add(weighted_index)
last_alerted_index = weighted_index

return detections

Expand All @@ -547,7 +551,7 @@ def create_alert(
application=signature.application,
)

all_methods = build_cpd_methods().keys()
all_methods = CPD_METHODS.keys()
confidences = {}

for method in all_methods:
Expand Down Expand Up @@ -645,13 +649,66 @@ def create_alert(
)


def prepare_series_for_detection(signature, replicates_enabled=REPLICATES):
"""
Fetches series data from the DB and runs all CPD detectors over it.
"""
max_alert_age = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE
series = PerformanceDatum.objects.filter(signature=signature, push_timestamp__gte=max_alert_age)

datum_with_replicates = (
PerformanceDatum.objects.filter(
signature=signature,
repository=signature.repository,
push_timestamp__gte=max_alert_age,
)
.annotate(
has_replicate=Exists(
PerformanceDatumReplicate.objects.filter(performance_datum_id=OuterRef("pk"))
)
)
.filter(has_replicate=True)
)
replicates = PerformanceDatumReplicate.objects.filter(
performance_datum_id__in=Subquery(datum_with_replicates.values("id"))
).values_list("performance_datum_id", "value")
replicates_map: dict[int, list[float]] = {}
for datum_id, value in replicates:
replicates_map.setdefault(datum_id, []).append(value)

revision_data = {}
for d in series:
if not revision_data.get(d.push_id):
revision_data[d.push_id] = RevisionDatumTest(
int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], []
)
revision_data[d.push_id].values.append(d.value)
revision_data[d.push_id].replicates.extend(replicates_map.get(d.id, []))

data = list(revision_data.values())
return detect_methods_changes(signature, data, replicates_enabled=replicates_enabled)


def _trim_series_after(analyzed_series, after_ts):
"""
Return a new list containing only the entries whose push_timestamp falls
strictly after after_ts. The original list is never mutated, keeping it
safe to reuse across multiple voting-strategy calls.
"""
return [d for d in analyzed_series if datetime.utcfromtimestamp(d.push_timestamp) > after_ts]


def generate_new_test_alerts_in_series(
signature,
voting_strategy=VOTING_STRATEGY,
min_method_agreement=MIN_METHOD_AGREEMENT,
detection_index_tolerance=DETECTION_INDEX_TOLERANCE,
replicates_enabled=REPLICATES,
analyzed_series=None,
):
"""
Generate test alerts for a signature using the given voting strategy.
"""
detection_method_name = name_voting_strategy(
voting_strategy,
min_method_agreement,
Expand All @@ -663,10 +720,9 @@ def generate_new_test_alerts_in_series(
# (2) the alerts max age
# use whichever is newer
with transaction.atomic():
max_alert_age = alert_after_ts = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE
series = PerformanceDatum.objects.filter(
signature=signature, push_timestamp__gte=max_alert_age
)
if analyzed_series is None:
analyzed_series = prepare_series_for_detection(signature, replicates_enabled)
alert_after_ts = datetime.now() - settings.PERFHERDER_ALERTS_MAX_AGE
latest_alert_timestamp = (
PerformanceAlertTesting.objects.filter(
series_signature=signature, detection_method=detection_method_name
Expand All @@ -677,48 +733,13 @@ def generate_new_test_alerts_in_series(
)
if latest_alert_timestamp:
latest_ts = latest_alert_timestamp[0]
series = series.filter(push_timestamp__gt=latest_ts)
if latest_ts > alert_after_ts:
alert_after_ts = latest_ts

datum_with_replicates = (
PerformanceDatum.objects.filter(
signature=signature,
repository=signature.repository,
push_timestamp__gte=alert_after_ts,
)
.annotate(
has_replicate=Exists(
PerformanceDatumReplicate.objects.filter(performance_datum_id=OuterRef("pk"))
)
)
.filter(has_replicate=True)
)
replicates = PerformanceDatumReplicate.objects.filter(
performance_datum_id__in=Subquery(datum_with_replicates.values("id"))
).values_list("performance_datum_id", "value")
replicates_map: dict[int, list[float]] = {}
for datum_id, value in replicates:
replicates_map.setdefault(datum_id, []).append(value)

revision_data = {}
for d in series:
if not revision_data.get(d.push_id):
revision_data[d.push_id] = RevisionDatumTest(
int(time.mktime(d.push_timestamp.timetuple())), d.push_id, [], []
)
revision_data[d.push_id].values.append(d.value)
revision_data[d.push_id].replicates.extend(replicates_map.get(d.id, []))

data = list(revision_data.values())
methods = build_cpd_methods()
analyzed_series = detect_methods_changes(
signature, data, methods, replicates_enabled=replicates_enabled
)
analyzed_series = _trim_series_after(analyzed_series, alert_after_ts)

# Apply voting with configurable parameters
# min_method_agreement: consensus threshold (absolute number: 3 means 3 methods must agree out of 6 total)
# detection_index_tolerance: tolerance for matching detections (±2 indices)
# detection_index_tolerance: tolerance for matching detections (±1 indices)
vote(
signature,
analyzed_series,
Expand Down
5 changes: 5 additions & 0 deletions treeherder/perf/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from treeherder.perf.alerts import (
generate_new_alerts_in_series,
generate_new_test_alerts_in_series,
prepare_series_for_detection,
)
from treeherder.perf.models import PerformanceSignature
from treeherder.workers.task import retryable_task
Expand All @@ -20,19 +21,23 @@ def generate_alerts(signature_id):
signature = PerformanceSignature.objects.get(id=signature_id)
generate_new_alerts_in_series(signature)
try:
analyzed_series = prepare_series_for_detection(signature, replicates_enabled=False)

generate_new_test_alerts_in_series(
signature,
voting_strategy="priority",
min_method_agreement=3,
detection_index_tolerance=1,
replicates_enabled=False,
analyzed_series=analyzed_series,
)
generate_new_test_alerts_in_series(
signature,
voting_strategy="equal",
min_method_agreement=3,
detection_index_tolerance=1,
replicates_enabled=False,
analyzed_series=analyzed_series,
)
except Exception as e:
logger.exception(
Expand Down