-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvalidate_opensearch.py
More file actions
executable file
·342 lines (284 loc) · 13.2 KB
/
validate_opensearch.py
File metadata and controls
executable file
·342 lines (284 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
#!/usr/bin/env python3
"""
Validate data integrity by querying OpenSearch and checking for gaps/missing messages
"""
import argparse
import json
import sys
import re
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
import ssl
def query_opensearch(host, port, index, user, password, query, use_ssl=True, scroll=None, is_scroll_request=False, path_prefix=''):
"""Query OpenSearch API"""
protocol = "https" if use_ssl else "http"
base = f"{protocol}://{host}:{port}{path_prefix}"
if is_scroll_request:
# Scroll continuation request
url = f"{base}/_search/scroll"
query_body = {
"scroll": scroll,
"scroll_id": query # query contains the scroll_id
}
else:
# Regular search request
if scroll:
# Initial search with scroll - pass scroll as URL parameter
url = f"{base}/{index}/_search?scroll={scroll}"
else:
url = f"{base}/{index}/_search"
query_body = query
headers = {
'Content-Type': 'application/json',
}
# Create auth header
if user and password:
import base64
credentials = base64.b64encode(f"{user}:{password}".encode()).decode()
headers['Authorization'] = f'Basic {credentials}'
# Create SSL context that doesn't verify certificates
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = Request(url, data=json.dumps(query_body).encode(), headers=headers, method='POST')
try:
with urlopen(req, context=ctx, timeout=30) as response:
return json.loads(response.read().decode())
except HTTPError as e:
print(f"HTTP Error: {e.code} - {e.reason}")
print(e.read().decode())
sys.exit(1)
except URLError as e:
print(f"URL Error: {e.reason}")
sys.exit(1)
def clear_scroll(host, port, scroll_id, user, password, use_ssl=True, path_prefix=''):
"""Clear scroll context"""
protocol = "https" if use_ssl else "http"
url = f"{protocol}://{host}:{port}{path_prefix}/_search/scroll"
headers = {
'Content-Type': 'application/json',
}
if user and password:
import base64
credentials = base64.b64encode(f"{user}:{password}".encode()).decode()
headers['Authorization'] = f'Basic {credentials}'
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
body = {"scroll_id": scroll_id}
req = Request(url, data=json.dumps(body).encode(), headers=headers, method='DELETE')
try:
with urlopen(req, context=ctx, timeout=10) as response:
pass
except:
pass # Ignore errors during cleanup
def get_total_count(host, port, index, user, password, use_ssl=True, path_prefix=''):
"""Get total document count in index"""
query = {
"query": {"match_all": {}},
"size": 0,
"track_total_hits": True
}
result = query_opensearch(host, port, index, user, password, query, use_ssl, path_prefix=path_prefix)
return result['hits']['total']['value']
def get_message_counters(host, port, index, user, password, use_ssl=True, path_prefix=''):
"""Extract all message counters and document _ids from the index using scroll API"""
counters = []
# Map: message_counter -> list of (_id, @timestamp) for duplicate diagnosis
counter_to_ids = {}
# Initial query with scroll (scroll parameter will be in URL)
query = {
"query": {"match_all": {}},
"size": 10000, # Batch size per scroll request
"sort": [{"@timestamp": "asc"}, {"_id": "asc"}], # Stable sort with _id as tiebreaker
"_source": ["message", "@timestamp"]
}
def process_hits(hits):
for hit in hits:
source = hit['_source']
message = source.get('message', '')
doc_id = hit['_id']
timestamp = source.get('@timestamp', '')
match = re.search(r'Test message #(\d+)', message)
if match:
counter_val = int(match.group(1))
counters.append(counter_val)
if counter_val not in counter_to_ids:
counter_to_ids[counter_val] = []
counter_to_ids[counter_val].append((doc_id, timestamp))
# Initial search request with scroll
result = query_opensearch(host, port, index, user, password, query, use_ssl, scroll="2m", path_prefix=path_prefix)
scroll_id = result.get('_scroll_id')
hits = result['hits']['hits']
process_hits(hits)
total_fetched = len(hits)
batch_count = 1
# Continue scrolling until no more results
while len(hits) > 0:
# Scroll to next batch
result = query_opensearch(host, port, index, user, password, scroll_id, use_ssl,
scroll="2m", is_scroll_request=True, path_prefix=path_prefix)
scroll_id = result.get('_scroll_id')
hits = result['hits']['hits']
if len(hits) == 0:
break
batch_count += 1
total_fetched += len(hits)
process_hits(hits)
# Print progress for large datasets
if batch_count % 10 == 0:
print(f" Fetched {total_fetched:,} documents, extracted {len(counters):,} counters...", end='\r')
# Clear scroll context
if scroll_id:
clear_scroll(host, port, scroll_id, user, password, use_ssl, path_prefix=path_prefix)
if batch_count > 1:
print(f" Fetched {total_fetched:,} documents, extracted {len(counters):,} counters ")
return sorted(counters), counter_to_ids
def validate_integrity(counters, expected_count=None):
"""Validate message sequence integrity"""
results = {
'total_messages': len(counters),
'expected_count': expected_count,
'unique_messages': 0,
'count_match': False,
'sequence_valid': True,
'gaps': [],
'duplicates': {},
'total_duplicates': 0,
'first_counter': None,
'last_counter': None,
}
if not counters:
return results
results['first_counter'] = counters[0]
results['last_counter'] = counters[-1]
# Count occurrences of each counter to detect duplicates
from collections import Counter
counter_counts = Counter(counters)
unique_counters = sorted(counter_counts.keys())
results['unique_messages'] = len(unique_counters)
# Check count against expected (using unique messages)
if expected_count:
results['count_match'] = (results['unique_messages'] == expected_count)
# Check for duplicates
for val, count in counter_counts.items():
if count > 1:
results['duplicates'][val] = count
results['total_duplicates'] += count - 1 # extra copies
results['sequence_valid'] = False
# Check for gaps using unique sorted counters
prev = unique_counters[0] - 1
for counter in unique_counters:
if counter != prev + 1:
gap_start = prev + 1
gap_end = counter - 1
results['gaps'].append((gap_start, gap_end))
results['sequence_valid'] = False
prev = counter
return results
def main():
parser = argparse.ArgumentParser(description='Validate Fluent-bit data integrity in OpenSearch')
parser.add_argument('--host', required=True, help='OpenSearch host')
parser.add_argument('--port', type=int, default=9200, help='OpenSearch port')
parser.add_argument('--index', required=True, help='Index name to validate')
parser.add_argument('--user', help='OpenSearch username')
parser.add_argument('--password', help='OpenSearch password')
parser.add_argument('--expected-count', type=int, help='Expected message count')
parser.add_argument('--no-ssl', action='store_true', help='Disable SSL')
parser.add_argument('--path-prefix', default='', help='URL path prefix (e.g. /os for reverse proxy)')
parser.add_argument('--json', action='store_true', help='Output results as JSON')
args = parser.parse_args()
use_ssl = not args.no_ssl
if not args.json:
print(f"Validating data in OpenSearch index: {args.index}")
print(f"Host: {args.host}:{args.port}")
print()
# Get total count
total = get_total_count(args.host, args.port, args.index, args.user, args.password, use_ssl, path_prefix=args.path_prefix)
if not args.json:
print(f"Total documents in index: {total}")
if args.expected_count:
print(f"Expected count: {args.expected_count}")
print()
# Extract counters
if not args.json:
print("Extracting message counters...")
counters, counter_to_ids = get_message_counters(args.host, args.port, args.index, args.user, args.password, use_ssl, path_prefix=args.path_prefix)
# Validate
results = validate_integrity(counters, args.expected_count)
if args.json:
# Convert duplicates dict keys to strings for JSON serialization
json_results = dict(results)
json_results['duplicates'] = {str(k): v for k, v in results['duplicates'].items()}
print(json.dumps(json_results, indent=2))
else:
print()
print("=== Validation Results ===")
print(f"Total documents in index: {results['total_messages']}")
print(f"Unique messages: {results['unique_messages']}")
if results['first_counter'] is not None:
print(f"Counter range: {results['first_counter']} to {results['last_counter']}")
if results['expected_count']:
status = "✓ PASS" if results['count_match'] else "✗ FAIL"
print(f"Unique count vs expected ({results['expected_count']}): {status}")
if results['total_duplicates'] > 0:
dup_ratio = results['total_messages'] / results['unique_messages'] if results['unique_messages'] > 0 else 0
print(f"\nDuplicates: {results['total_duplicates']:,} extra copies ({dup_ratio:.1f}x average duplication)")
# Show the most duplicated counters
sorted_dups = sorted(results['duplicates'].items(), key=lambda x: -x[1])
print(f"Most duplicated values (showing top 10 of {len(results['duplicates']):,}):")
for val, count in sorted_dups[:10]:
print(f" - Counter {val}: {count} copies")
if len(sorted_dups) > 10:
print(f" ... and {len(sorted_dups) - 10} more")
# Diagnose Generate_ID effectiveness: do duplicates share _id?
print(f"\n--- Generate_ID Diagnosis ---")
same_id_count = 0
diff_id_count = 0
sample_dups = sorted_dups[:5] # Examine top 5 duplicated counters
for val, count in sample_dups:
entries = counter_to_ids.get(val, [])
ids = [e[0] for e in entries]
unique_ids = set(ids)
if len(unique_ids) == 1:
same_id_count += 1
else:
diff_id_count += 1
print(f" Counter {val} ({count} copies):")
for doc_id, ts in entries[:4]: # Show up to 4
print(f" _id={doc_id} @timestamp={ts}")
if len(entries) > 4:
print(f" ... and {len(entries) - 4} more")
if diff_id_count > 0:
print(f"\n → {diff_id_count}/{len(sample_dups)} sampled duplicates have DIFFERENT _id values")
print(f" → Generate_ID is NOT producing deterministic IDs for retried records")
print(f" → Possible cause: record content changes between original send and retry")
elif same_id_count > 0:
print(f"\n → {same_id_count}/{len(sample_dups)} sampled duplicates share the SAME _id")
print(f" → Generate_ID IS deterministic, but OpenSearch stored duplicates anyway")
print(f" → Possible cause: data stream routing to different backing indices")
if results['sequence_valid']:
print("\nSequence integrity: ✓ PASS (no gaps or duplicates)")
else:
if results['gaps']:
total_missing = sum(gap_end - gap_start + 1 for gap_start, gap_end in results['gaps'])
print(f"\nGaps found ({len(results['gaps'])} gaps, {total_missing:,} missing counters):")
for gap_start, gap_end in results['gaps'][:10]: # Show first 10
if gap_start == gap_end:
print(f" - Missing: {gap_start}")
else:
print(f" - Missing: {gap_start} to {gap_end} ({gap_end - gap_start + 1} values)")
if len(results['gaps']) > 10:
print(f" ... and {len(results['gaps']) - 10} more")
elif results['total_duplicates'] > 0 and not results['gaps']:
print("\nSequence: ✓ No gaps (all counters present), but duplicates exist")
print()
# Exit with appropriate code
if results['sequence_valid'] and (not results['expected_count'] or results['count_match']):
print("✓ All validation checks passed!")
sys.exit(0)
else:
print("✗ Validation failed!")
sys.exit(1)
if __name__ == '__main__':
main()