-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathes2os.sh
More file actions
1483 lines (1251 loc) · 54.3 KB
/
es2os.sh
File metadata and controls
1483 lines (1251 loc) · 54.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -e
# Function to set up the workstation with required applications
setup() {
echo "Setting up the workstation..."
# Import the GPG key for Elasticsearch
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
# Install transport package and add Elasticsearch source list
sudo apt-get install -y apt-transport-https jq curl net-tools
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
# Update package list and install specific version of Logstash
sudo apt-get update
echo "Installing Logstash version 7.13.4-1..."
sudo apt-get install -y logstash=1:7.13.4-1
# Install the OpenSearch output plugin for Logstash
echo "Installing Logstash OpenSearch plugin..."
sudo /usr/share/logstash/bin/logstash-plugin install logstash-output-opensearch
# Verify plugin installation
sudo /usr/share/logstash/bin/logstash-plugin list | grep opensearch && echo "OpenSearch plugin installed successfully."
echo "Setup complete."
}
# Load environment variables and set defaults
setup_variables() {
local env_file_path="${1:-./env.sh}"
# Load environment variables from the specified env.sh path, if available
if [ -f "$env_file_path" ]; then
echo "Loading environment variables from $env_file_path..."
source "$env_file_path"
else
echo "Warning: Environment file $env_file_path not found. Using default values."
fi
# Define Instance Details
INSTANCE_COUNT="${INSTANCE_COUNT:-1}"
INSTANCE_ID="${INSTANCE_ID:-1}"
# Define default values for environment variables
ES_ENDPOINT="${ES_HOST:-https://es.la.local:9200}"
KB_ENDPOINT="${KB_HOST:-https://kb.la.local:5601}"
ES_USERNAME="${ES_USER:-elastic}"
ES_PASSWORD="${ES_PASS:-default_elastic_password}"
ES_SSL="${ES_SSL:-true}"
ES_CA_FILE="${ES_CA_FILE:-}"
ES_BATCH_SIZE="${ES_BATCH_SIZE:-2000}"
DATAVIEW_API_INSECURE="${DATAVIEW_API_INSECURE:-true}"
OS_ENDPOINT="${OS_HOST:-https://os.la.local:9200}"
OS_USERNAME="${OS_USER:-admin}"
OS_PASSWORD="${OS_PASS:-default_admin_password}"
OS_SSL="${OS_SSL:-true}"
OS_CA_FILE="${OS_CA_FILE:-}"
OS_SSL_CERT_VERIFY="${OS_SSL_CERT_VERIFY:-false}"
DATE_FIELD_KEY="${DATE_FIELD_KEY:-@timestamp}"
FILTER_DATE_FROM="${FILTER_DATE_FROM:-}"
FILTER_DATE_TO="${FILTER_DATE_TO:-}"
# Define output directory and create it if it doesn't exist
OUTPUT_DIR="${OUTPUT_DIR:-./output_files}"
mkdir -p "$OUTPUT_DIR"
DATAVIEW_DIR="$OUTPUT_DIR/dataviews"
mkdir -p "$DATAVIEW_DIR"
DATAVIEW_FILE="$DATAVIEW_DIR/dataviews.json"
DATAVIEW_REPORT_FILE="$DATAVIEW_DIR/dataviews_migration_report.csv"
INDICES_DIR="$DATAVIEW_DIR/indices"
mkdir -p "$INDICES_DIR"
INDICES_REPORT_FILE="$INDICES_DIR/indices_migration_report.csv"
LOGSTASH_DIR="$OUTPUT_DIR/logstash"
mkdir -p "$LOGSTASH_DIR"
LOGSTASH_CONF_DIR="$OUTPUT_DIR/logstash/conf"
mkdir -p "$LOGSTASH_CONF_DIR"
LOGSTASH_DATA_DIR="$OUTPUT_DIR/logstash/data"
mkdir -p "$LOGSTASH_DATA_DIR"
DASHBOARD_DIR="$OUTPUT_DIR/dashboards"
mkdir -p "$DASHBOARD_DIR"
LOGS_DIR="$OUTPUT_DIR/logs"
mkdir -p "$LOGS_DIR"
LOG_FILE="$LOGS_DIR/$(date '+%Y-%m-%d-%H-%M-%S').log"
CURRENT_LOG_FILE="$LOGS_DIR/current.log"
REPORT_DIR="$OUTPUT_DIR/report"
mkdir -p "$REPORT_DIR"
# Control config cleanup
CONFIG_CLEANUP="${CONFIG_CLEANUP:-false}"
# Set DEBUG to false by default
DEBUG="${DEBUG:-false}"
# Set concurrency, 2 is default
CONCURRENCY="${CONCURRENCY:-2}"
if ! [[ "$CONCURRENCY" =~ ^[0-9]+$ ]] || [ "$CONCURRENCY" -lt 2 ]; then
CONCURRENCY=2
fi
# Set indices pattern to include or exclude, default is none
EXCLUDE_PATTERNS="${EXCLUDE_PATTERNS:-}"
INCLUDE_ONLY_PATTERNS="${INCLUDE_ONLY_PATTERNS:-}"
# Logstash pipeline batch size, 500 is default
LS_BATCH_SIZE="${LS_BATCH_SIZE:-125}"
# Set default JAVAOPTS
LS_JAVA_OPTS="${LS_JAVA_OPTS:-}"
# Determine curl flags based on DATAVIEW_API_INSECURE setting
CURL_FLAGS=""
if [ "$DATAVIEW_API_INSECURE" = true ]; then
CURL_FLAGS="--insecure"
fi
}
# Sanitize name to remove special characters
sanitize_name() {
replacer_char='_'
input=$(echo "$1" | sed 's/^ *//;s/ *$//')
sanitized=$(echo "$input" | tr -c '[:alnum:]_-' "$replacer_char")
echo "$sanitized"
}
# Function to get Logstash PIDs with active network connections
get_logstash_processes() {
# Get all Logstash PIDs
local LOGSTASH_PIDS
LOGSTASH_PIDS=$(pgrep -f logstash 2>/dev/null)
# Check if no Logstash processes are found
if [ -z "$LOGSTASH_PIDS" ]; then
return 1 # Indicate failure
fi
# Filter PIDs to include only those with network activity
local FILTERED_PIDS=()
for PID in $LOGSTASH_PIDS; do
if sudo netstat -nptul | grep -q "$PID"; then
FILTERED_PIDS+=("$PID")
fi
done
# Check if no filtered PIDs remain
if [ ${#FILTERED_PIDS[@]} -eq 0 ]; then
return 1 # Indicate failure
fi
# return filtered PIDs as a space-separated string
echo "${FILTERED_PIDS[@]}"
return 0 # Indicate success
}
get_master_processes() {
# Get the script name for filtering the processes
local SC_NAME="$0"
# Get all PIDs of processes related to the script and "migrate" keyword
local MASTER_PIDS
MASTER_PIDS=$(pgrep -f "$SC_NAME.*migrate" 2>/dev/null)
# Check if no processes are found
if [ -z "$MASTER_PIDS" ]; then
return 1 # Indicate failure
fi
# Return the filtered PIDs as a space-separated string
echo "$MASTER_PIDS"
return 0 # Indicate success
}
get_indices_detail_by_id() {
local uuid=$1
if [[ -n "$uuid" ]]; then
indices_json_file=$(grep -rl --include="*.json" "$uuid" "$INDICES_DIR" | head -n 1)
if [[ -f "$indices_json_file" ]]; then
# Extract root-level details
local sid data_view index_pattern
sid=$(jq -r '.sid' "$indices_json_file")
data_view=$(jq -r '.["Data View"]' "$indices_json_file")
index_pattern=$(jq -r '.["Index Pattern"]' "$indices_json_file")
# Search for the UUID in the indices array
while IFS= read -r index_entry; do
indices_uuid=$(echo "$index_entry" | jq -r '.UUID')
if [[ "$indices_uuid" == "$uuid" ]]; then
local indices_name health index_status doc_count primary_data_size store_size
indices_name=$(echo "$index_entry" | jq -r '.["Index Name"]')
health=$(echo "$index_entry" | jq -r '.Health')
index_status=$(echo "$index_entry" | jq -r '.["Index Status"]')
doc_count=$(echo "$index_entry" | jq -r '.["Doc Count"]')
primary_data_size=$(echo "$index_entry" | jq -r '.["Primary Data Size"]')
store_size=$(echo "$index_entry" | jq -r '.["Store Size"]')
# Construct raw JSON output
local raw_json
raw_json=$(jq -n --arg sid "$sid" \
--arg data_view "$data_view" \
--arg index_pattern "$index_pattern" \
--arg uuid "$indices_uuid" \
--arg index_name "$indices_name" \
--arg health "$health" \
--arg index_status "$index_status" \
--arg doc_count "$doc_count" \
--arg primary_data_size "$primary_data_size" \
--arg store_size "$store_size" \
'{
"UUID": $uuid,
"SID": $sid,
"Data View": $data_view,
"Index Pattern": $index_pattern,
"Index Name": $index_name,
"Health": $health,
"Index Status": $index_status,
"Doc Count": $doc_count,
"Primary Data Size": $primary_data_size,
"Store Size": $store_size
}')
# Echo raw JSON
echo "$raw_json"
return 0
fi
done < <(jq -c '.indices[]' "$indices_json_file")
else
echo "Error: JSON file for UUID $uuid not found in $INDICES_DIR." >&2
return 1
fi
else
echo "Error: UUID is not provided." >&2
return 1
fi
}
# Monitoring function
status() {
# Temporarily disable `set -e` for this function
set +e
local LOGSTASH_PIDS
LOGSTASH_PIDS=$(get_logstash_processes)
# Check if there are no Logstash processes
if [[ -z "$LOGSTASH_PIDS" ]]; then
echo "No Logstash processes found."
return
fi
echo "============================"
for PID in $LOGSTASH_PIDS; do
echo "Logstash Instance:"
echo "PID: $PID"
echo "----------------------------"
PORT=$(sudo netstat -nptul | awk -v pid="$PID" '$0 ~ pid {split($4, a, ":"); print a[2]}')
echo "Port: ${PORT:-Unavailable}"
CONFIG_FILE=$(sudo ps -aux | awk -v pid="$PID" '$2 == pid {split($0, a, "-f "); split(a[2], b, " "); print b[1]}')
echo "Config File: ${CONFIG_FILE:-Unavailable}"
PATH_DATA=$(sudo ps -aux | awk -v pid="$PID" '$2 == pid {split($0, a, "--path.data="); split(a[2], b, " "); print b[1]}')
echo "Data Path: ${PATH_DATA:-Unavailable}"
if [[ -n "$PATH_DATA" ]]; then
INDICES_UUID=$(awk -F '/' '{print $NF}' <<<"$PATH_DATA")
fi
if [[ -n "$INDICES_UUID" ]]; then
echo "Fetching Index Details for UUID: $INDICES_UUID"
indices_details=$(get_indices_detail_by_id "$INDICES_UUID")
if [[ -n "$indices_details" ]]; then
indices_name=$(echo "$indices_details" | jq -r '.["Index Name"]' 2>/dev/null)
indices_docs=$(echo "$indices_details" | jq -r '.["Doc Count"]' 2>/dev/null)
indices_size=$(echo "$indices_details" | jq -r '.["Store Size"]' 2>/dev/null)
else
echo "Failed to fetch index details for UUID: $INDICES_UUID"
fi
else
echo "No UUID found for Logstash PID: $PID"
fi
echo "Index Info:"
echo " UUID: ${INDICES_UUID}"
echo " Name: ${indices_name:-Unknown}"
echo " Docs: ${indices_docs:-Unknown}"
echo " Size: ${indices_size:-Unknown}"
# Fetch pipeline stats from Logstash
ls_endpoint="http://localhost:$PORT"
PIPELINE_STATE=$(curl -s "$ls_endpoint/_node/stats/pipelines")
PIPELINE_STATUS=$(echo "$PIPELINE_STATE" | jq -r .status 2>/dev/null)
PIPELINE_BATCH_SIZE=$(echo "$PIPELINE_STATE" | jq -r .pipeline.batch_size 2>/dev/null)
PIPELINE_WORKER=$(echo "$PIPELINE_STATE" | jq -r .pipeline.workers 2>/dev/null)
PIPELINE_DIM=$(echo "$PIPELINE_STATE" | jq -r .pipelines.main.events.duration_in_millis 2>/dev/null)
PIPELINE_OUT=$(echo "$PIPELINE_STATE" | jq -r .pipelines.main.events.out 2>/dev/null)
if [[ -n "$PIPELINE_DIM" && "$PIPELINE_DIM" -gt 0 ]]; then
PIPELINE_RATE=$(awk "BEGIN { printf \"%.2f\", $PIPELINE_OUT / ($PIPELINE_DIM / 1000) }")
else
PIPELINE_RATE=0
fi
if [[ "${indices_docs:-0}" -eq 0 ]]; then
PERCENTAGE=0
else
PERCENTAGE=$(awk "BEGIN { printf \"%.2f\", ${PIPELINE_OUT:-0} / ${indices_docs:-1} * 100 }")
fi
echo "Pipeline Info:"
echo " Status: ${PIPELINE_STATUS:-Unavailable}"
echo " Batch Size: ${PIPELINE_BATCH_SIZE:-Unavailable}"
echo " Workers: ${PIPELINE_WORKER:-Unavailable}"
echo " Out: ${PIPELINE_OUT:-0} / ${indices_docs:-0} (${PERCENTAGE}%)"
echo " Rate: ${PIPELINE_RATE:-0.00} events/sec"
sudo /usr/share/logstash/jdk/bin/jstat -gc "$PID" 2>/dev/null |
awk 'NR > 1 {
used_heap = $3 + $4 + $6 + $8
total_heap = $5 + $7 + $9
printf "Heap Usage: %.2f / %.2f MB\n", used_heap / 1024, total_heap / 1024
}'
echo "----------------------------"
done
echo "End of Logstash Instance"
echo "============================"
set -e
}
logs() {
local follow_logs=$1
if [[ ! -f "$CURRENT_LOG_FILE" ]]; then
echo "No logs found. Migration might not have started yet."
exit 1
fi
if $follow_logs; then
tail -f "$CURRENT_LOG_FILE"
else
cat "$CURRENT_LOG_FILE"
fi
}
stop_all_processes() {
# Temporarily disable `set -e` for this function
set +e
local LOGSTASH_PIDS
LOGSTASH_PIDS=$(get_logstash_processes)
MASTER_PIDS=$(get_master_processes)
# Function to kill parent processes recursively and update the report if needed
kill_parent_processes() {
local PID=$1
local UUID=$2
while [[ -n "$PID" ]]; do
# Get the parent PID
PARENT_PID=$(ps -o ppid= -p "$PID" | xargs)
# Kill the process
sudo kill -9 "$PID" 2>/dev/null
echo "Terminated process with PID $PID (parent PID: $PARENT_PID)"
# If UUID is provided and not empty, update the report
if [[ -n "$UUID" ]]; then
update_indices_report "$UUID" "Stopped"
fi
# Stop if parent PID is 1 (init system)
if [[ "$PARENT_PID" -eq 1 ]]; then
break
fi
# Set PID to the parent PID for the next iteration
PID=$PARENT_PID
done
}
if [[ -n "$MASTER_PIDS" ]]; then
for MPID in $MASTER_PIDS; do
# Terminate the main Logstash process and its parent processes
kill_parent_processes "$MPID"
# Check if the Logstash process was terminated
if pgrep -x "$MPID" >/dev/null; then
echo "Failed to terminate Master process with PID $MPID."
else
echo "Master process with PID $MPID terminated successfully."
fi
done
else
echo "No Master processes found."
fi
# First, terminate all Logstash processes if they exist
if [[ -n "$LOGSTASH_PIDS" ]]; then
for LPID in $LOGSTASH_PIDS; do
PATH_DATA=$(sudo ps -aux | awk -v pid="$LPID" '$2 == pid {split($0, a, "--path.data="); split(a[2], b, " "); print b[1]}')
if [[ -n "$PATH_DATA" ]]; then
if [[ ! -f "$INDICES_REPORT_FILE" ]]; then
echo "Error: Indices report file not found at $INDICES_REPORT_FILE"
INDICES_UUID=""
else
INDICES_UUID=$(awk -F '/' '{print $NF}' <<<"$PATH_DATA")
fi
fi
# Terminate the main Logstash process and its parent processes
kill_parent_processes "$LPID" "$INDICES_UUID"
# Check if the Logstash process was terminated
if pgrep -x "$LPID" >/dev/null; then
echo "Failed to terminate Logstash process with PID $LPID."
else
echo "Logstash process with PID $LPID terminated successfully."
if [[ -n "$INDICES_UUID" ]]; then
update_indices_report "$INDICES_UUID" "Stopped"
fi
fi
done
else
echo "No Logstash processes found."
fi
# Then, process the PID file for associated processes, even if no Logstash processes were found
if [[ -f "$LOGSTASH_DIR/pids" ]]; then
while read -r pid uuid; do
# Kill the process from the pids file and its parent processes
kill_parent_processes "$pid" "$uuid"
sleep 1
if pgrep -x "$pid" >/dev/null; then
echo "Failed to terminate process with PID $pid for UUID $uuid."
else
echo "Process with PID $pid for UUID $uuid terminated successfully."
# Update the report for this UUID
if [[ -n "$uuid" ]]; then
update_indices_report "$uuid" "Stopped"
fi
# Remove the entry from the pids file
sed -i "/^$pid $uuid$/d" "$LOGSTASH_DIR/pids"
fi
done <"$LOGSTASH_DIR/pids"
fi
set -e
}
# Fetch data views from API and save to file
get_dashboards() {
echo "Fetching data views from $KB_ENDPOINT..."
DASHBOARD_FILE="$DASHBOARD_DIR/dashboards.json"
curl -s $CURL_FLAGS -u "$ES_USERNAME:$ES_PASSWORD" "$KB_ENDPOINT/api/saved_objects/_find?type=dashboard&per_page=10000" -o "$DASHBOARD_FILE"
# Check if data view file was created and is not empty
if [[ ! -s "$DASHBOARD_FILE" ]]; then
echo "No dashboard found or failed to fetch dashboards. Exiting."
exit 1
fi
# Output the content for debugging
echo "Response from API:"
cat "$DASHBOARD_FILE"
echo ""
echo "Total Dashboards found:" "$(jq -c '.total' "$DASHBOARD_FILE")"
jq -c '.saved_objects[]' "$DASHBOARD_FILE" | while read -r row; do
id=$(echo "$row" | jq -r '.id')
title=$(echo "$row" | jq -r '.attributes.title')
sanitized_dashboard_file_name=$(sanitize_name "$title-$id")
dashboard_file=$DASHBOARD_DIR/$sanitized_dashboard_file_name.ndjson
echo "Exporting dashboard: $id $title: $dashboard_file"
# Export each dashboard to a separate ndjson file
curl -s $CURL_FLAGS -u "$ES_USERNAME:$ES_PASSWORD" "$KB_ENDPOINT/api/saved_objects/_export" -H 'kbn-xsrf: true' -H 'Content-Type: application/json' -d '{
"objects": [{"type": "dashboard", "id": "'"$id"'"}],
"includeReferencesDeep": true
}' >"$dashboard_file"
done
echo "Dashboard export completed. Files are in the $DASHBOARD_DIR directory."
}
logstash_cleanup() {
local uuid=$1
local index=$2
# Sanitize index for the config filename
local sanitized_index=$(sanitize_name "$index")
local config_file="$LOGSTASH_CONF_DIR/${sanitized_index}.conf"
local logstash_data_dir="$LOGSTASH_DATA_DIR/$uuid"
# Remove config if CONFIG_CLEANUP is true
if [ "$CONFIG_CLEANUP" = true ]; then
rm "$config_file"
rm -rf $logstash_data_dir
fi
}
generate_logstash_config() {
local uuid=$1
local index=$2
# Sanitize index for the config filename
local sanitized_index=$(sanitize_name "$index")
local config_file="$LOGSTASH_CONF_DIR/${sanitized_index}.conf"
# Configure query
get_query() {
if [[ -z "$FILTER_DATE_FROM" && -z "$FILTER_DATE_TO" ]]; then
QUERY='{"query":{"query_string":{"query":"*"}}}'
else
QUERY="{\"query\":{\"bool\":{\"must\":[{\"query_string\":{\"query\":\"*\"}},{\"range\":{\"$DATE_FIELD_KEY\":{"
RANGE_CLAUSES=()
if [[ -n "$FILTER_DATE_FROM" ]]; then
RANGE_CLAUSES+=("\"gte\":\"$FILTER_DATE_FROM\"")
fi
if [[ -n "$FILTER_DATE_TO" ]]; then
RANGE_CLAUSES+=("\"lte\":\"$FILTER_DATE_TO\"")
fi
# Join the range clauses with a comma
QUERY+=$(
IFS=','
echo "${RANGE_CLAUSES[*]}"
)
QUERY+="}}}]}}}"
fi
echo "$QUERY"
}
local query=$(get_query)
# Generate Logstash configuration for the current data view
cat <<EOF >"$config_file"
input {
elasticsearch {
hosts => ["${ES_ENDPOINT#https://}"]
user => "\${ES_USERNAME}"
ssl => $ES_SSL
password => "\${ES_PASSWORD}"
index => "$index,-.*"
query => '$query'
scroll => "5m"
size => $ES_BATCH_SIZE
docinfo => true
docinfo_target => "[@metadata][doc]"
EOF
# Add ca_file only if ES_CA_FILE is set
if [ -n "$ES_CA_FILE" ]; then
echo " ca_file => \"$ES_CA_FILE\"" >>"$config_file"
fi
# Close the input and start output section
cat <<EOF >>"$config_file"
}
}
output {
EOF
# Add stdout output if DEBUG is true
if [ "$DEBUG" = true ]; then
echo " stdout { codec => json }" >>"$config_file"
fi
# Continue with the standard output section
cat <<EOF >>"$config_file"
opensearch {
hosts => ["$OS_ENDPOINT"]
auth_type => {
type => 'basic'
user => "\${OS_USERNAME}"
password => "\${OS_PASSWORD}"
}
ssl => $OS_SSL
ssl_certificate_verification => $OS_SSL_CERT_VERIFY
index => "%{[@metadata][doc][_index]}"
document_id => "%{[@metadata][doc][_id]}"
EOF
# Add ca_file only if ES_CA_FILE is set
if [ -n "$OS_CA_FILE" ]; then
echo " cacert => \"$OS_CA_FILE\"" >>"$config_file"
fi
# Close the input and start output section
cat <<EOF >>"$config_file"
}
}
EOF
echo "Logstash configuration for Index $index created as $config_file"
}
run_logstash() {
local uuid=$1
local index=$2
# Sanitize index for the config filename
local sanitized_index=$(sanitize_name "$index")
local config_file="$LOGSTASH_CONF_DIR/${sanitized_index}.conf"
# Create a unique path.data directory for each instance of Logstash
local logstash_data_dir="$LOGSTASH_DATA_DIR/$uuid"
mkdir -p "$logstash_data_dir" # Create the directory if it doesn't exist
# Update report file status to "InProgress"
update_indices_report "$uuid" "InProgress"
# Set environment variables for Logstash
export ES_USERNAME="$ES_USERNAME"
export ES_PASSWORD="$ES_PASSWORD"
export OS_USERNAME="$OS_USERNAME"
export OS_PASSWORD="$OS_PASSWORD"
export LS_JAVA_OPTS="$LS_JAVA_OPTS"
# Test the Logstash configuration
echo "Testing Logstash configuration for $index..."
if sudo -E /usr/share/logstash/bin/logstash -f "$config_file" --path.data="$logstash_data_dir" --config.test_and_exit; then
echo "Logstash configuration for $index is valid."
# Run Logstash in the background with the unique path.data
echo "Running Logstash for index $index..."
sudo -E /usr/share/logstash/bin/logstash -b $LS_BATCH_SIZE -f "$config_file" --path.data="$logstash_data_dir" & # Run in background
pid=$! # Capture the background process's PID
echo "Logstash for index $index started with PID $pid."
echo "$pid $uuid" >>"$LOGSTASH_DIR/pids" # Store UUID and PID in pids file
# Wait for the background process to finish
wait $pid # This ensures we wait for Logstash to finish before continuing
# Check if Logstash succeeded or failed
if [ $? -eq 0 ]; then
echo "Index $index processed successfully."
update_indices_report "$uuid" "Done"
sed -i "/$pid $uuid/d" "$LOGSTASH_DIR/pids" # Remove the entry from pids after completion
return 0
else
echo "Failed to process Index $index."
update_indices_report "$uuid" "Failed"
sed -i "/$pid $uuid/d" "$LOGSTASH_DIR/pids" # Remove the entry from pids after completion
return 1
fi
else
echo "Logstash configuration for $index is invalid."
update_indices_report "$uuid" "Failed"
return 1
fi
}
# Initialize report file with all indices marked as UnProcessed
generate_initial_indices_report() {
local indices_file=$1
echo "Generating initial report for $indices_file"
# Check if jq is installed
if ! command -v jq &>/dev/null; then
echo "Error: jq is required but not installed."
exit 1
fi
# Check if the indices file exists
if [[ ! -f "$indices_file" ]]; then
echo "Error: Indices file '$indices_file' not found."
exit 1
fi
# Initialize the report file if it doesn't exist
if [[ ! -f "$INDICES_REPORT_FILE" ]]; then
echo "uuid, sid, Index Pattern, Index, Doc Count, Primary Data Size, Start Time, Last Update, Status" >"$INDICES_REPORT_FILE"
fi
# Extract the general information from the indices file
sid=$(jq -r '.sid' "$indices_file")
index_pattern=$(jq -r '.["Index Pattern"]' "$indices_file")
# Set current time for Start Time and Last Update
current_time=$(date +"%Y-%m-%d %H:%M:%S")
# Iterate over each index entry within the indices array and avoid multiple jq calls per index
jq -c '.indices[]' "$indices_file" | while IFS= read -r index; do
uuid=$(echo "$index" | jq -r '.UUID')
index_name=$(echo "$index" | jq -r '.["Index Name"]')
doc_count=$(echo "$index" | jq -r '.["Doc Count"]')
primary_size=$(echo "$index" | jq -r '.["Primary Data Size"]')
# Validate the extracted fields
if [[ -z "$uuid" || -z "$index_name" ]]; then
echo "Warning: Missing UUID or Index Name for an entry, skipping."
continue
fi
# Check if the UUID is already in the report file
if grep -q "^$uuid," "$INDICES_REPORT_FILE"; then
# Extract existing values from the report file
existing_line=$(grep "^$uuid," "$INDICES_REPORT_FILE")
existing_doc_count=$(echo "$existing_line" | cut -d',' -f5 | xargs)
# Compare and update if the new doc count is greater
if [[ "$doc_count" -gt "$existing_doc_count" ]]; then
# Update the line in the report file and set Status to Updated
sed -i "s|^$uuid,.*|$uuid, $sid, $index_pattern, $index_name, $doc_count, $primary_size, , $current_time, Updated|" "$INDICES_REPORT_FILE"
fi
else
# Add new entry if UUID is not present
echo "$uuid, $sid, $index_pattern, $index_name, $doc_count, $primary_size, , $current_time, UnProcessed" >>"$INDICES_REPORT_FILE"
fi
done
}
fetch_indices() {
local id=$1
local name=$2
local title=$3
local sanitized_title=$(sanitize_name "$title")
local sid=$(sanitize_name "$id")
local indices_json_file="$INDICES_DIR/$sid.json"
if [[ ! -f "$indices_json_file" ]]; then
echo "Fetching Indices List of data view $title from $ES_ENDPOINT..."
# Fetch the list of indices and capture the HTTP status code
response=$(curl -s -w "%{http_code}" $CURL_FLAGS -u "$ES_USERNAME:$ES_PASSWORD" \
"$ES_ENDPOINT/_cat/indices/$title?h=index,health,status,uuid,pri,rep,docs.count,docs.deleted,store.size,pri.store.size,rep.store.size")
http_code="${response: -3}" # Extract last 3 characters as HTTP status code
raw_indices_list="${response%???}" # Remove last 3 characters to get the actual response body
# Check if the HTTP status indicates a failure
if [[ "$http_code" -ne 200 ]]; then
echo "Error: Failed to fetch indices for data view $title. HTTP Status: $http_code"
# Check if the error response is valid JSON
if echo "$raw_indices_list" | jq . >/dev/null 2>&1; then
# If JSON, include it directly in the error field
jq -n --arg sid "$sid" \
--arg name "$name" \
--arg title "$title" \
--argjson error "$raw_indices_list" \
'{
sid: $sid,
"Data View": $name,
"Index Pattern": $title,
indices: [],
error: $error
}' >"$indices_json_file"
else
# If not JSON, treat it as a string
jq -n --arg sid "$sid" \
--arg name "$name" \
--arg title "$title" \
--arg error "Failed to fetch indices: HTTP Status $http_code - $raw_indices_list" \
'{
sid: $sid,
"Data View": $name,
"Index Pattern": $title,
indices: [],
error: $error
}' >"$indices_json_file"
fi
return
fi
# Check if indices were returned (empty response means no indices)
if [[ -z "$raw_indices_list" ]]; then
echo "No indices found for data view $title. Saving empty indices list to JSON file."
# Save JSON with empty indices and no error
jq -n --arg sid "$sid" \
--arg name "$name" \
--arg title "$title" \
'{
sid: $sid,
"Data View": $name,
"Index Pattern": $title,
indices: []
}' >"$indices_json_file"
return
fi
# Initialize the JSON file structure for successful fetch
jq -n --arg sid "$sid" \
--arg name "$name" \
--arg title "$title" \
'{
sid: $sid,
"Data View": $name,
"Index Pattern": $title,
indices: []
}' >"$indices_json_file"
# Append each index entry into the JSON structure using jq
while IFS= read -r line; do
# Check if the line is empty or only contains whitespace
[[ -z "$line" ]] && continue
read -ra columns <<<"$line"
uuid="${columns[3]}"
index_name="${columns[0]}"
health="${columns[1]}"
index_status="${columns[2]}"
doc_count="${columns[6]}"
primary_data_size="${columns[9]}"
store_size="${columns[8]}"
# Append index data to the indices array in the JSON file
jq --arg uuid "$uuid" \
--arg index_name "$index_name" \
--arg health "$health" \
--arg index_status "$index_status" \
--arg doc_count "$doc_count" \
--arg primary_data_size "$primary_data_size" \
--arg store_size "$store_size" \
'.indices += [{
UUID: $uuid,
"Index Name": $index_name,
Health: $health,
"Index Status": $index_status,
"Doc Count": $doc_count,
"Primary Data Size": $primary_data_size,
"Store Size": $store_size
}]' "$indices_json_file" >tmp.json && mv tmp.json "$indices_json_file"
done <<<"$raw_indices_list"
fi
echo "Indices details for data view $title saved to $indices_json_file"
}
# Fetch data views from API and save to file
fetch_dataviews() {
echo "Fetching data views from $KB_ENDPOINT..."
curl -s $CURL_FLAGS -u "$ES_USERNAME:$ES_PASSWORD" "$KB_ENDPOINT/api/data_views" -o "$DATAVIEW_FILE"
# Check if data view file was created and is not empty
if [[ ! -s "$DATAVIEW_FILE" ]]; then
echo "No data views found or failed to fetch data views. Exiting."
exit 1
else
jq -c '.data_view |= sort_by(.id)' "$DATAVIEW_FILE" >"$DATAVIEW_FILE.tmp" && mv "$DATAVIEW_FILE.tmp" "$DATAVIEW_FILE"
fi
# Output the content for debugging
echo "Response from API:"
cat "$DATAVIEW_FILE"
echo ""
}
# Initialize report file with all data views marked as UnProcessed
generate_initial_report() {
# Initialize the report file if it doesn't exist
if [[ ! -f "$DATAVIEW_REPORT_FILE" ]]; then
echo "sid, id, Data View, Index Pattern, Status" >"$DATAVIEW_REPORT_FILE"
fi
# Add all data views to the report with "UnProcessed" status
jq -c '.data_view[]' "$DATAVIEW_FILE" | while read -r row; do
id=$(echo "$row" | jq -r '.id')
name=$(echo "$row" | jq -r '.name')
title=$(echo "$row" | jq -r '.title')
sid=$(sanitize_name "$id")
if ! grep -q "^$sid," "$DATAVIEW_REPORT_FILE"; then
echo "$sid, $id, $name, $title, UnProcessed" >>"$DATAVIEW_REPORT_FILE"
fi
indices_json_file="$INDICES_DIR/$sid.json"
# Fetch Indices List
if ! fetch_indices "$id" "$name" "$title"; then
echo "Error: Failed to fetch Indices details for data view $title"
exit 1
else
# Generate Initial Indices Report
if ! generate_initial_indices_report "$indices_json_file"; then
echo "Error: Failed to generate the initial indices report at $INDICES_REPORT_FILE"
exit 1
fi
fi
done
}
# Update or append status in the report file
update_report() {
local id=$1
local name=$2
local index_pattern=$3
local status=$4
local sid=$(sanitize_name "$id")
if grep -q "^$sid," "$DATAVIEW_REPORT_FILE"; then
sed -i "s/^$sid,.*/$sid, $id, $name, $index_pattern, $status/" "$DATAVIEW_REPORT_FILE"
else
echo "$sid, $id, $name, $index_pattern, $status" >>"$DATAVIEW_REPORT_FILE"
fi
# Backup strategy: Create backup only if 15 minutes have passed since the last backup
BKP_DATAVIEW_REPORT_FILE="$DATAVIEW_DIR/dataviews_migration_report-$(date '+%Y-%m-%d-%H-%M').csv"
if [[ $(find "$DATAVIEW_DIR" -name "dataviews_migration_report-*.csv" -mmin -15 | wc -l) -eq 0 ]]; then
if [[ -f "$DATAVIEW_REPORT_FILE" ]]; then
cp "$DATAVIEW_REPORT_FILE" "$BKP_DATAVIEW_REPORT_FILE"
cp "$DATAVIEW_REPORT_FILE" "$DATAVIEW_DIR/indices_migration_report-latest.csv"
echo "Backup created for Data view Report: $BKP_DATAVIEW_REPORT_FILE"
else
echo "Error: Data view report file does not exist: $DATAVIEW_REPORT_FILE"
exit 1
fi
fi
}
# Verify if the data view should be processed or skipped
verify_dataview() {
local id=$1
local name=$2
local title=$3
local sid=$(sanitize_name "$id")
# Check the report file for the current data view's status
local status=$(grep -E "^$sid," "$DATAVIEW_REPORT_FILE" | cut -d ',' -f5 | tr -d ' ')
# If status is "Done" or "Skipped", skip processing
if [[ "$status" == "Done" || "$status" == "Skipped" ]]; then
echo "Data view $title is already processed. Skipping..."
return 1
fi
# Skip system indexes if configured to do so
if [[ "$IGNORE_SYSTEM_INDEXES" = true && "$title" == .* ]]; then
echo "Ignoring system index: $title"
update_report "$id" "$name" "$title" "Skipped"
return 1
fi
# Check if the index exists
if ! curl -s $CURL_FLAGS -u "$ES_USERNAME:$ES_PASSWORD" -o /dev/null -w "%{http_code}" "$ES_ENDPOINT/_cat/indices/$title" | grep -q "200"; then
echo "Index $title does not exist. Skipping this data view."
update_report "$id" "$name" "$title" "Skipped"
return 1
fi
return 0
}
# Update or append status in the report file
update_indices_report() {
local uuid="$1"
local status="$2"
if [[ -z "$uuid" || -z "$status" ]]; then
echo "Error: UUID or status is missing."
return 1
fi
# Set the current time
local current_time
current_time=$(date +"%Y-%m-%d %H:%M:%S")
if [[ ! -f "$INDICES_REPORT_FILE" || ! -s "$INDICES_REPORT_FILE" ]]; then
echo "Warning: Report file is missing or blank. Generating..."
echo "uuid, sid, Index Pattern, Index, Doc Count, Primary Data Size, Start Time, Last Update, Status" >"$INDICES_REPORT_FILE"
fi
# Check if UUID exists in the file
if grep -q "^$uuid, " "$INDICES_REPORT_FILE"; then
# Update Status, Last Update, and Start Time if empty
awk -v uuid="$uuid" -v status="$status" -v current_time="$current_time" '
BEGIN { FS = OFS = ", " }
NR == 1 { print; next } # Print the header line
$1 == uuid {
$9 = status # Update Status
$8 = current_time # Update Last Update
if ($7 == "") $7 = current_time # Update Start Time only if empty
}
{ print } # Print all lines
' "$INDICES_REPORT_FILE" >tmpfile && mv tmpfile "$INDICES_REPORT_FILE"
else
indices_details=$(get_indices_detail_by_id "$uuid")
if [[ -n "$indices_details" ]]; then