-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontext.py
More file actions
99 lines (82 loc) · 2.69 KB
/
context.py
File metadata and controls
99 lines (82 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from dataclasses import dataclass
from datetime import datetime, timedelta
from datetime import timezone
from typing import Dict, List, Tuple, Optional
from detector import AnomalyV2
from store import PatternStoreV2, PatternKey
@dataclass(frozen=True)
class DeployEvent:
service: str
version: str
timestamp: datetime
@dataclass(frozen=True)
class AnomalyContextV2:
anomaly: AnomalyV2
window_start: datetime
window_end: datetime
related_patterns: Dict[PatternKey, int]
level_breakdown: Dict[str, int]
deploy_event: Optional[DeployEvent]
request_ids: List[str]
class ContextBuilderV2:
def __init__(
self,
store: PatternStoreV2,
context_window: timedelta = timedelta(minutes=5),
):
self.store = store
self.context_window = context_window
def build(
self,
anomaly: AnomalyV2,
deploy_events: List[DeployEvent] | None = None,
) -> AnomalyContextV2:
window_end = anomaly.last_seen
window_start = window_end - self.context_window
# ---- Aggregate activity in window ----
activity = self.store.get_activity_window(
since=window_start,
until=window_end,
)
# ---- Related patterns (same service, different template) ----
related: Dict[PatternKey, int] = {}
for key, count in activity.items():
if key[0] == anomaly.key[0] and key != anomaly.key:
related[key] = count
# ---- Level breakdown ----
level_breakdown: Dict[str, int] = {}
for (svc, level, _), count in activity.items():
level_breakdown[level] = level_breakdown.get(level, 0) + count
# ---- Deploy correlation ----
deploy_event = self._find_deploy(
anomaly,
deploy_events or [],
window_start,
window_end,
)
# ---- Request IDs (best-effort, placeholder) ----
# NOTE: real request correlation comes later
request_ids: List[str] = []
return AnomalyContextV2(
anomaly=anomaly,
window_start=window_start,
window_end=window_end,
related_patterns=related,
level_breakdown=level_breakdown,
deploy_event=deploy_event,
request_ids=request_ids,
)
def _find_deploy(
self,
anomaly: AnomalyV2,
deploy_events: List[DeployEvent],
start: datetime,
end: datetime,
) -> Optional[DeployEvent]:
for d in deploy_events:
if (
d.service == anomaly.key[0]
and start <= d.timestamp <= end
):
return d
return None