-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstore.py
More file actions
131 lines (102 loc) · 3.58 KB
/
store.py
File metadata and controls
131 lines (102 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from collections import defaultdict, deque
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import timezone
from typing import Dict, List, Tuple
from v3.ingest import LogEvent
PatternKey = Tuple[str, str, str] # (service, level, template)
LEVEL_WEIGHTS = {
"ERROR": 5.0,
"WARN": 2.0,
"INFO": 1.0,
"DEBUG": 0.5,
}
@dataclass
class PatternStats:
total_count: int
first_seen: datetime
last_seen: datetime
class PatternStoreV2:
def __init__(self, window_size: timedelta, bucket_size: timedelta):
self.window_size = window_size
self.bucket_size = bucket_size
# key -> deque[(bucket_start, count)]
self._buckets: Dict[PatternKey, deque[Tuple[datetime, int]]] = defaultdict(deque)
# key -> stats
self._stats: Dict[PatternKey, PatternStats] = {}
# ---------- Internal helpers ----------
def _bucket_start(self, ts: datetime) -> datetime:
seconds = int(ts.timestamp())
bucket_seconds = int(self.bucket_size.total_seconds())
return datetime.fromtimestamp(
seconds - (seconds % bucket_seconds),
tz=timezone.utc,
)
def _evict_old(self, key: PatternKey, now: datetime):
cutoff = now - self.window_size
buckets = self._buckets[key]
while buckets and buckets[0][0] < cutoff:
buckets.popleft()
# ---------- Write API ----------
def add(self, event: LogEvent):
key: PatternKey = (event.service, event.level, event.template)
bucket_ts = self._bucket_start(event.timestamp)
buckets = self._buckets[key]
if not buckets or buckets[-1][0] != bucket_ts:
buckets.append((bucket_ts, 1))
else:
ts, count = buckets.pop()
buckets.append((ts, count + 1))
self._evict_old(key, event.timestamp)
self._update_stats(key, event.timestamp)
def _update_stats(self, key: PatternKey, ts: datetime):
if key not in self._stats:
self._stats[key] = PatternStats(
total_count=1,
first_seen=ts,
last_seen=ts,
)
else:
stats = self._stats[key]
stats.total_count += 1
stats.last_seen = ts
# ---------- Read APIs (V2 FIX) ----------
def get_patterns(self) -> List[PatternKey]:
return list(self._buckets.keys())
def get_buckets(self, key: PatternKey) -> List[Tuple[datetime, int]]:
return list(self._buckets.get(key, []))
def get_stats(self, key: PatternKey) -> PatternStats:
return self._stats[key]
def get_weighted_count(
self,
key: PatternKey,
since: datetime,
) -> float:
"""
Returns weighted count since given timestamp.
Used by anomaly detector.
"""
weight = LEVEL_WEIGHTS.get(key[1], 1.0)
total = 0.0
for ts, count in self._buckets.get(key, []):
if ts >= since:
total += count * weight
return total
def get_activity_window(
self,
since: datetime,
until: datetime,
) -> Dict[PatternKey, int]:
"""
Aggregate raw counts for all patterns in a time window.
Used by context builder.
"""
activity: Dict[PatternKey, int] = {}
for key, buckets in self._buckets.items():
total = 0
for ts, count in buckets:
if since <= ts <= until:
total += count
if total > 0:
activity[key] = total
return activity