-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdetector.py
More file actions
122 lines (102 loc) · 3.69 KB
/
detector.py
File metadata and controls
122 lines (102 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import timezone
from typing import List, Optional
from store import PatternStoreV2, PatternKey
@dataclass(frozen=True)
class AnomalyV2:
key: PatternKey
reason: str # spike | new_pattern
severity: float
recent_weighted: float
baseline_weighted: float
first_seen: datetime
last_seen: datetime
@dataclass(frozen=True)
class NearMiss:
key: PatternKey
recent_weighted: float
baseline_weighted: float
threshold: float
class AnomalyDetectorV2:
def __init__(
self,
store: PatternStoreV2,
recent_window: timedelta,
spike_multiplier: float = 5.0,
min_baseline: float = 5.0,
track_near_miss: bool = True,
):
self.store = store
self.recent_window = recent_window
self.spike_multiplier = spike_multiplier
self.min_baseline = min_baseline
self.track_near_miss = track_near_miss
def detect(
self,
now: datetime,
) -> tuple[List[AnomalyV2], List[NearMiss]]:
anomalies: List[AnomalyV2] = []
near_misses: List[NearMiss] = []
recent_cutoff = now - self.recent_window
for key in self.store.get_patterns():
buckets = self.store.get_buckets(key)
if not buckets:
continue
recent = self.store.get_weighted_count(key, recent_cutoff)
# baseline = everything before recent window
baseline_total = 0.0
baseline_buckets = 0
for ts, count in buckets:
if ts < recent_cutoff:
baseline_total += count
baseline_buckets += 1
baseline_avg = (
baseline_total / baseline_buckets
if baseline_buckets > 0
else 0.0
)
stats = self.store.get_stats(key)
# ---- New pattern ----
level = key[1]
if level in {"INFO", "DEBUG"}:
continue
if baseline_avg == 0.0 and recent > 0:
anomalies.append(
AnomalyV2(
key=key,
reason="new_pattern",
severity=recent,
recent_weighted=recent,
baseline_weighted=0.0,
first_seen=stats.first_seen,
last_seen=stats.last_seen,
)
)
continue
# ---- Spike detection ----
if baseline_avg >= self.min_baseline:
threshold = baseline_avg * self.spike_multiplier
if recent >= threshold:
anomalies.append(
AnomalyV2(
key=key,
reason="spike",
severity=recent / baseline_avg,
recent_weighted=recent,
baseline_weighted=baseline_avg,
first_seen=stats.first_seen,
last_seen=stats.last_seen,
)
)
elif self.track_near_miss and recent >= threshold * 0.7:
near_misses.append(
NearMiss(
key=key,
recent_weighted=recent,
baseline_weighted=baseline_avg,
threshold=threshold,
)
)
anomalies.sort(key=lambda a: a.severity, reverse=True)
return anomalies, near_misses