diff --git a/rickshaw.json b/rickshaw.json index 995840d..aa63152 100644 --- a/rickshaw.json +++ b/rickshaw.json @@ -6,7 +6,7 @@ }, "tool": "sysstat", "controller": { - "post-script": "%tool-dir%sysstat-post-process" + "post-script": "%tool-dir%sysstat-post-process.py" }, "collector": { "files-from-controller": [ diff --git a/sysstat-post-process b/sysstat-post-process deleted file mode 100755 index ac5fa03..0000000 --- a/sysstat-post-process +++ /dev/null @@ -1,760 +0,0 @@ -#!/usr/bin/perl -## -*- mode: perl; indent-tabs-mode: nil; perl-indent-level: 4 -*- -## vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=perl - -use strict; -use warnings; -use JSON::XS; -use JSON::Validator; -use Data::Dumper; -use Time::Piece; - -BEGIN { - if (!(exists $ENV{'TOOLBOX_HOME'} && -d "$ENV{'TOOLBOX_HOME'}/perl")) { - print "This script requires libraries that are provided by the toolbox project.\n"; - print "Toolbox can be acquired from https://github.com/perftool-incubator/toolbox and\n"; - print "then use 'export TOOLBOX_HOME=/path/to/toolbox' so that it can be located.\n"; - exit 1; - } -} -use lib "$ENV{'TOOLBOX_HOME'}/perl"; -use toolbox::json; -use toolbox::cpu; -use toolbox::metrics; - -sub build_netdev_types { - my %netdev_types; - my $netdev_types_file = "netdev-types.txt"; - if (-f $netdev_types_file) { - my $fh = new IO::Uncompress::UnXz $netdev_types_file, Transparent => 1 || die "Could not open $netdev_types_file"; - while ( my $line = <$fh>) { - chomp $line; - if ($line =~ /^(\S+)\s+\.\.\/\.\.\/devices\/(\S+)$/) { - $netdev_types{$1} = $2; - } - } - close($fh); - } - return \%netdev_types; -} - -sub get_netdev_type { - my $netdev_name = shift; - my $netdev_types_ref = shift; - if (exists $$netdev_types_ref{$netdev_name}) { - if ($$netdev_types_ref{$netdev_name} =~ /^pci/) { - return "physical"; - } elsif ($$netdev_types_ref{$netdev_name} =~ /^virtual/) { - return "virtual"; - } else { - return "unknown"; - } - } - return "unknown"; -} - -sub get_hms_ms { - my $hour = shift; - my $min = shift; - my $sec = shift; - - return (1000 * ($hour * 60 * 60 + $min * 60 + $sec)); -} - -sub advance_ymd { - my $ymd_timestamp_ms = shift; - - return ($ymd_timestamp_ms + (1000 * 60 * 60 * 24)); -} - -print "sysstat-post-process\n"; -my @pids; -my @mpstat_pids; -my @cpu_topology; # store a hash containing package, die, core, and thread ID per element, CPU ID = index -my %metric_types; -my %no_names = (); -opendir(my $dh, "."); -my @files = sort readdir($dh); -printf "files to process:\n @files"; - -my @sar_files = grep(/^sar-stdout.txt(\.xz){0,1}$/, @files); -if (scalar @sar_files > 1) { - printf "ERROR: there should never be more than one sar file\n%s\n", @sar_files; -} -if (scalar @sar_files == 0) { - printf "ERROR: there is no sar file\n"; -} -if (scalar @sar_files == 1) { - my @sar_pids; - my $log_file = $sar_files[0]; - printf "Found %s\n", $log_file; - my @sources = (qw(sar-mem sar-scheduler sar-io sar-tasks sar-net)); - for my $source (@sources) { - my %desc = ('class' => 'throughput', 'source' => $source); - if (my $pid = fork) { - push(@sar_pids, $pid); - } else { - printf "Post-processing for %s started\n", $source; - my $netdev_types_ref = build_netdev_types; - my $ymd_timestamp_ms; # Epochtime in milliseconds - my $scan_mode = ""; - my $hms_ms; # Number of milliseconds for only today's hour-minute-seconds (NOT since epoch, since 12:00 AM) - my $prev_hms_ms; - my $log_fh = new IO::Uncompress::UnXz $log_file, Transparent => 1 || die "[ERROR]could not open file " . $log_file; - while (<$log_fh>) { - chomp; - if (/^Linux\s\S+\s\S+\s+(\d+-\d+-\d+)\s+\S+\s+\S+/) { - #Linux 4.18.0-147.8.1.el8_1.x86_64 (worker000) 2020-06-24 _x86_64_ (64 CPU) - my $ymd = $1; - $ymd_timestamp_ms = `date +%s%N -d $ymd -u` / 1000000; - } - if ($source eq "sar-mem") { - if ( /(\d+:\d+:\d+)\s+pgpgin\/s\s+pgpgout\/s\s+fault\/s\s+majflt\/s\s+pgfree\/s\s+pgscank\/s\s+pgscand\/s\s+pgsteal\/s\s+%vmeff$/ ) { - #21:26:11 pgpgin/s pgpgout/s fault/s majflt/s pgfree/s pgscank/s pgscand/s pgsteal/s %vmeff - $scan_mode = "paging"; - } elsif ( /(\d+:\d+:\d+)\s+pswpin\/s\s+pswpout\/s$/ ) { - #16:19:36 pswpin/s pswpout/s - $scan_mode = "swapping"; - } elsif ( /(\d+:\d+:\d+)\s+%smem-10\s+%smem-60\s+%smem-300\s+%smem\s+%fmem-10\s+%fmem-60\s+%fmem-300\s+%fmem$/ ) { - #16:19:33 %smem-10 %smem-60 %smem-300 %smem %fmem-10 %fmem-60 %fmem-300 %fmem - $scan_mode = "memory-starved"; - } elsif ( /(\d+:\d+:\d+)\s+kbmemfree\s+kbavail\s+kbmemused\s+%memused\s+kbbuffers\s+kbcached\s+kbcommit\s+%commit\s+kbactive\s+kbinact\s+kbdirty$/ ) { - #21:28:50 kbmemfree kbavail kbmemused %memused kbbuffers kbcached kbcommit %commit kbactive kbinact kbdirty - $scan_mode = "memory-utilization"; - } - elsif ($scan_mode eq "paging") { - if ( /(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$/ ) { - #21:26:14 0.00 385.67 15381.00 0.00 9596.00 0.00 0.00 0.00 0.00 - my $hour = $1; - my $min = $2; - my $sec = $3; - my $kb_paged_in = $4; - my $kb_paged_out = $5; - my %faults; - $faults{'minor'} = $6 - $7; - $faults{'major'} = $7; - my $freed_pages = $8; - my $kswapd_scanned_pages = $9; - my $scanned_pages = $10; - my $reclaimed_pages = $11; - my $vm_efficiency = $12; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - - $desc{'type'} = 'KB-Paged-in-sec'; - $sample{'value'} = $kb_paged_in; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'KB-Paged-out-sec'; - $sample{'value'} = $kb_paged_out; - log_sample($source, \%desc, \%no_names, \%sample); - for my $fault_type (keys %faults) { - $desc{'type'} = 'Page-faults-sec'; - my %names = ('type' => $fault_type); - $sample{'value'} = $faults{$fault_type}; - log_sample($source, \%desc, \%names, \%sample); - } - $desc{'type'} = 'Pages-freed-sec'; - $sample{'value'} = $freed_pages; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'kswapd-scanned-pages-sec'; - $sample{'value'} = $kswapd_scanned_pages; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'scanned-pages-sec'; - $sample{'value'} = $scanned_pages; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'reclaimed-pages-sec'; - $sample{'value'} = $reclaimed_pages; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'VM-Efficiency'; - $sample{'value'} = $vm_efficiency; - log_sample($source, \%desc, \%no_names, \%sample); - } elsif ($_ eq '') { - $scan_mode = ''; - } - } elsif ($scan_mode eq "swapping") { - if ( /(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$/ ) { - #16:19:39 0.00 0.00 - my $hour = $1; - my $min = $2; - my $sec = $3; - my $pages_in = $4; - my $pages_out = $5; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - $desc{'type'} = 'Pages-swapped-in-sec'; - $sample{'value'} = $pages_in; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Pages-swapped-out-sec'; - $sample{'value'} = $pages_out; - log_sample($source, \%desc, \%no_names, \%sample); - } elsif ($_ eq '') { - $scan_mode = ''; - } - } elsif ($scan_mode eq "memory-starved") { - if ( /(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$/ ) { - #16:19:36 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 - my $hour = $1; - my $min = $2; - my $sec = $3; - my %memory_waiting; - $memory_waiting{'010s'} = $4; - $memory_waiting{'060s'} = $5; - $memory_waiting{'300s'} = $6; - $memory_waiting{'last_interval'} = $7; - my %memory_stalled; - $memory_stalled{'010s'} = $8; - $memory_stalled{'060s'} = $9; - $memory_stalled{'300s'} = $10; - $memory_stalled{'last_interval'} = $11; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - for my $time_window (keys %memory_waiting) { - $desc{'type'} = '%-Time-Tasks-Waiting-on-Memory-' . $time_window; - $sample{'value'} = $memory_waiting{$time_window}; - log_sample($source, \%desc, \%no_names, \%sample); - } - for my $time_window (keys %memory_stalled) { - $desc{'type'} = '%-Time-Non-Idle-Tasks-Stalled-on-Memory-' . $time_window; - $sample{'value'} = $memory_stalled{$time_window}; - log_sample($source, \%desc, \%no_names, \%sample); - } - } elsif ($_ eq '') { - $scan_mode = ''; - } - } elsif ($scan_mode eq "memory-utilization") { - if ( /(\d+):(\d+):(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)\s+(\d+)\s+(\d+)$/ ) { - #21:28:53 1865772 66317648 28872324 29.63 40 63292632 58318396 59.84 41768712 32304200 11020 - my $hour = $1; - my $min = $2; - my $sec = $3; - my $kb_mem_free = $4; - my $kb_available = $5; - my $kb_mem_used = $6; - my $percent_mem_used = $7; - my $kb_buffers = $8; - my $kb_cached = $9; - my $kb_commit = $10; - my $percent_commit = $11; - my $kb_active = $12; - my $kb_inactive = $13; - my $kb_dirty = $14; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - - $desc{'type'} = 'Memory-Free-KB'; - $sample{'value'} = $kb_mem_free; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Available-KB'; - $sample{'value'} = $kb_available; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Used-KB'; - $sample{'value'} = $kb_mem_used; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Used-Percent'; - $sample{'value'} = $percent_mem_used; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Buffers-KB'; - $sample{'value'} = $kb_buffers; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Cached-KB'; - $sample{'value'} = $kb_cached; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Commit-KB'; - $sample{'value'} = $kb_commit; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Commit-Percent'; - $sample{'value'} = $percent_commit; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Active-KB'; - $sample{'value'} = $kb_active; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Inactive-KB'; - $sample{'value'} = $kb_inactive; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Memory-Dirty-KB'; - $sample{'value'} = $kb_dirty; - log_sample($source, \%desc, \%no_names, \%sample); - } elsif ($_ eq '') { - $scan_mode = ''; - } - } - } elsif ($source eq "sar-io") { - if ( /(\d+:\d+:\d+)\s+%sio-10\s+%sio-60\s+%sio-300\s+%sio\s+%fio-10\s+%fio-60\s+%fio-300\s+%fio$/ ) { - #16:19:33 %sio-10 %sio-60 %sio-300 %sio %fio-10 %fio-60 %fio-300 %fio - $scan_mode = "io-starved"; - } elsif ($scan_mode eq "io-starved") { - if ( /(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$/ ) { - #16:19:36 0.13 0.17 0.04 0.01 0.10 0.12 0.03 0.00 - my $hour = $1; - my $min = $2; - my $sec = $3; - my %io_time_lost; - $io_time_lost{'010s'} = $4; - $io_time_lost{'060s'} = $5; - $io_time_lost{'300s'} = $6; - $io_time_lost{'last_interval'} = $7; - my %io_time_stalled; - $io_time_stalled{'010s'} = $8; - $io_time_stalled{'060s'} = $9; - $io_time_stalled{'300s'} = $10; - $io_time_stalled{'last_interval'} = $11; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - for my $time_window (keys %io_time_lost) { - $desc{'type'} = '%-Time-Tasks-Lost-Waiting-on-IO-' . $time_window; - $sample{'value'} = $io_time_lost{$time_window}; - log_sample($source, \%desc, \%no_names, \%sample); - } - for my $time_window (keys %io_time_stalled) { - $desc{'type'} = '%-Time-Tasks-Stalled-Waiting-on-IO-' . $time_window; - $sample{'value'} = $io_time_stalled{$time_window}; - log_sample($source, \%desc, \%no_names, \%sample); - } - } elsif ($_ eq '') { - $scan_mode = ''; - } - } - } elsif ($source eq "sar-tasks") { - if ( /(\d+:\d+:\d+)\s+proc\/s\s+cswch\/s$/ ) { - #15:41:30 proc/s cswch/s - $scan_mode = "task"; - } elsif ($scan_mode eq "task") { - if ( /(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$/ ) { - #15:41:33 137.67 2529.67 - my $hour = $1; - my $min = $2; - my $sec = $3; - my $processes_created = $4; - my $context_switches = $5; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - $desc{'type'} = 'Processes-created-sec'; - $sample{'value'} = $processes_created; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Context-switches-sec'; - $sample{'value'} = $context_switches; - log_sample($source, \%desc, \%no_names, \%sample); - } elsif ($_ eq '') { - $scan_mode = ''; - } - } - } elsif ($source eq "sar-scheduler") { - if ( /(\d+:\d+:\d+)\s+%scpu-10\s+%scpu-60\s+%scpu-300\s+%scpu$/ ) { - #15:20:52 %scpu-10 %scpu-60 %scpu-300 %scpu - $scan_mode = "cpu-starved"; - } elsif ( /(\d+:\d+:\d+)\s+runq-sz\s+plist-sz\s+ldavg-1\s+ldavg-5\s+ldavg-15\s+blocked$/ ) { - #16:19:36 runq-sz plist-sz ldavg-1 ldavg-5 ldavg-15 blocked - $scan_mode = "task-lists"; - } elsif ($scan_mode eq "cpu-starved") { - if ( /(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$/ ) { - #15:20:55 7.12 2.37 2.33 5.14 - my $hour = $1; - my $min = $2; - my $sec = $3; - my %cpu_starved; - $cpu_starved{'010s'} = $4; - $cpu_starved{'060s'} = $5; - $cpu_starved{'300s'} = $6; - $cpu_starved{'last_interval'} = $7; - - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - for my $time_window (keys %cpu_starved) { - $desc{'type'} = '%-Time-Tasks-CPU-Starved-' . $time_window; - $sample{'value'} = $cpu_starved{$time_window}; - log_sample($source, \%desc, \%no_names, \%sample); - } - } elsif ($_ eq '') { - $scan_mode = ''; - } - } elsif ($scan_mode eq "task-lists") { - if ( /(\d+):(\d+):(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+)$/ ) { - #16:19:39 1 551 0.50 0.13 0.04 0 - my $hour = $1; - my $min = $2; - my $sec = $3; - my $run_queue_length = $4; - my $process_list_size = $5; - my %load_average; - $load_average{'01m'} = $6; - $load_average{'05m'} = $7; - $load_average{'15m'} = $8; - my $io_blocked_tasks = $9; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - $desc{'type'} = 'Run-Queue-Length'; - $sample{'value'} = $run_queue_length; - log_sample($source, \%desc, \%no_names, \%sample); - $desc{'type'} = 'Process-List-Size'; - $sample{'value'} = $process_list_size; - log_sample($source, \%desc, \%no_names, \%sample); - for my $time_window (keys %load_average) { - $desc{'type'} = 'Load-Average-' . $time_window; - $sample{'value'} = $load_average{$time_window}; - log_sample($source, \%desc, \%no_names, \%sample); - } - $desc{'type'} = 'IO-Blocked-Tasks'; - $sample{'value'} = $io_blocked_tasks; - log_sample($source, \%desc, \%no_names, \%sample); - } elsif ($_ eq '') { - $scan_mode = ''; - } - } - } elsif ($source eq "sar-net") { - if ( /(\d+:\d+:\d+)\s+IFACE\s+rxpck\/s\s+txpck\/s\s+rxkB\/s\s+txkB\/s\s+rxcmp\/s\s+txcmp\/s\s+rxmcst\/s\s+%ifutil$/ ) { - #22:28:54 IFACE rxpck/s txpck/s rxkB/s txkB/s rxcmp/s txcmp/s rxmcst/s %ifutil - $scan_mode = "net"; - } elsif ($scan_mode eq "net") { - if (/(\d+):(\d+):(\d+)\s+(\S+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)/) { - #22:28:57 eth1 0.00 4.33 0.00 0.40 0.00 0.00 0.00 0.00 - my $hour = $1; - my $min = $2; - my $sec = $3; - my $dev = $4; - my $rxpack = $5; - my $txpack = $6; - my $rxkB = $7; - my $txkB = $8; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - $desc{'type'} = 'L2-Gbps'; - for my $direction ('rx', 'tx') { - my %names = ('dev' => $dev, 'direction' => $direction, 'type' => get_netdev_type($dev, $netdev_types_ref)); - my $value; - if ($direction eq 'rx') { - $value = $rxkB / 1000000 * 8; - } else { - $value = $txkB / 1000000 * 8; - } - $sample{'value'} = $value; - log_sample($source, \%desc, \%names, \%sample); - } - $desc{'type'} = 'packets-sec'; - for my $direction ('rx', 'tx') { - my %names = ('dev' => $dev, 'direction' => $direction, 'type' => get_netdev_type($dev, $netdev_types_ref)); - my $value; - if ($direction eq 'rx') { - $value = $rxpack; - } else { - $value = $txpack; - } - $sample{'value'} = $value; - log_sample($source, \%desc, \%names, \%sample); - } - } elsif ($_ eq '') { - $scan_mode = ''; - } - } - if (/(\d+:\d+:\d+)\s+IFACE\s+rxerr\/s\s+txerr\/s\s+coll\/s\s+rxdrop\/s\s+txdrop\/s\s+txcarr\/s\s+rxfram\/s\s+rxfifo\/s\s+txfifo\/s$/) { - #15:42:48 IFACE rxerr/s txerr/s coll/s rxdrop/s txdrop/s txcarr/s rxfram/s rxfifo/s txfifo/s - $scan_mode = "net-error"; - } elsif ($scan_mode eq "net-error") { - if (/(\d+):(\d+):(\d+)\s+(\S+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)/) { - #22:28:57 eth1 0.00 4.33 0.00 0.40 0.00 0.00 0.00 0.00 - #15:42:51 vxlan_sys_4789 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 - my %networkerrors; - my $hour = $1; - my $min = $2; - my $sec = $3; - my $dev = $4; - $networkerrors{'tx'}{'collision'} = $7; - $networkerrors{'rx'}{'drop'} = $8; - $networkerrors{'tx'}{'drop'} = $9; - $networkerrors{'tx'}{'carrier'} = $10; - $networkerrors{'rx'}{'frame-alignment'} = $11; - $networkerrors{'rx'}{'fifo-overrun'} = $12; - $networkerrors{'tx'}{'fifo-overrun'} = $13; - $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - my %sample = ('end' => $ymd_timestamp_ms + $hms_ms); - $desc{'type'} = 'errors-sec'; - foreach my $direction (keys %networkerrors) { # $direction keys are rx and tx - foreach my $flavor (keys %{$networkerrors{$direction}}) { # $flavor keys are collision|drop|frame-alignment|carrier|fifo-overrun - $sample{'value'} = $networkerrors{$direction}{$flavor}; - my %names = ('dev' => $dev, 'direction' => $direction, 'type' => get_netdev_type( $dev, $netdev_types_ref ), 'error' => $flavor); - log_sample($source, \%desc, \%names, \%sample); - } - } - } elsif ($_ eq '') { - $scan_mode = ''; - } - } - } - $prev_hms_ms = $hms_ms; - } - close($log_fh); - finish_samples; - printf "Post-processing for %s complete\n", $source; - exit; - } - } - printf "Waiting for %d sar post-processing forks\n", scalar @sar_pids; - while (wait() > -1) {} -} - - -my @mpstat_files = grep(/^mpstat.json(\.xz){0,1}$/, @files); -if (scalar @mpstat_files > 1) { - printf "ERROR: there should never be more than one mpstat file\n%s\n", @mpstat_files; -} -if (scalar @mpstat_files == 0) { - printf "ERROR: there is no mpstat file\n"; -} -if (scalar @mpstat_files == 1) { - my $log_file = $mpstat_files[0];; - printf "Found %s\n", $log_file; - my $cpu_topo_ref = build_cpu_topology("sys/devices/system/cpu"); - my $num_mpstat_forks = 8; - for (my $i = 0; $i < $num_mpstat_forks; $i++) { - if (my $pid = fork) { - push(@mpstat_pids, $pid); - } else { - printf "Post-processing for mpstat-%d started\n", $i; - my $log_fh = new IO::Uncompress::UnXz $log_file, Transparent => 1 || die "[ERROR]could not open file " . $log_file; - my $ymd_timestamp_ms; # Epochtime in milliseconds - my %desc = ('class' => 'throughput', 'source' => 'mpstat'); - my %sample; - my $hms_ms; - my $prev_hms_ms; - my $coder = JSON::XS->new; - while (<$log_fh>) { - chomp; - if (/"date": "(\d+-\d+-\d+)",/) { - # "date": "2021-04-26", - my $ymd = $1; - $ymd_timestamp_ms = `date +%s%N -d $ymd -u` / 1000000; - } elsif (/"timestamp": "(\d+):(\d+):(\d+)",/) { - $hms_ms = get_hms_ms($1, $2, $3); - if (defined $prev_hms_ms and $prev_hms_ms > $hms_ms) { - # hour-minute-second is lower than last reading so one day has passed - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - $sample{'end'} = $ymd_timestamp_ms + $hms_ms; - } elsif ( /(\{"cpu": "\d+".*\}),{0,1}$/ ) { - my $cpu_entry = $coder->decode($1) || die "Could not read JSON"; - if ($$cpu_entry{'cpu'} % $num_mpstat_forks == $i) { - (my $package, my $die, my $core, my $thread) = get_cpu_topology($$cpu_entry{'cpu'}, $cpu_topo_ref); - for my $cpu_type (grep(!/^cpu?/, (keys %{ $cpu_entry } ))) { - my %names = ('package' => $package, 'die' => $die, 'core' => $core, 'thread' => $thread, 'num' => $$cpu_entry{'cpu'}, 'type' => $cpu_type); - if ($cpu_type eq "idle" or $cpu_type eq "iowait" or $cpu_type eq "steal") { - $desc{'type'} = 'NonBusy-CPU'; - } else { - $desc{'type'} = 'Busy-CPU'; - } - $sample{'value'} = $$cpu_entry{$cpu_type} / 100; - log_sample("mpstat-" . $i, \%desc, \%names, \%sample); - } - } - } - $prev_hms_ms = $hms_ms; - } - finish_samples; - printf "Post-processing for mpstat-%d complete\n", $i; - exit; - } - } - printf "Waiting for %d mpstat post-processing forks\n", scalar @mpstat_pids; - while (wait() > -1) {} -} - -my @iostat_files = grep(/^iostat.json(\.xz){0,1}$/, @files); -if (scalar @mpstat_files > 1) { - printf "ERROR: there should never be more than one iostat file\n%s\n", @iostat_files; -} -if (scalar @iostat_files == 0) { - printf "ERROR: there is no iostat file\n"; -} -if (scalar @iostat_files == 1) { - my $log_file = $iostat_files[0];; - printf "Post-processing for iostat started\n"; - (my $rc, my $fh) = open_read_text_file($log_file); - if ($rc > 0 or ! defined $fh) { - printf "sysstat-post-process: open_read_text_file() failed on %s\n", $log_file; - exit 0; - } - my $time_ms; - while (<$fh>) { - chomp; - if (m/\s*"timestamp"\:\s*"(.*)".*?/) { - $time_ms = Time::Piece->strptime($1,'%Y-%m-%dT%T%z')->epoch * 1000; - } - elsif (m/\s+({.*disk_device.*}).*?/) { - my $iosample_ref = decode_json($1); - my %desc = ('source' => 'iostat'); - foreach my $field_name (grep(!/^disk_device$/, keys %$iosample_ref)) { - my %sample = ('value'=> $iosample_ref->{$field_name}, 'end' => $time_ms); - my %names = ('dev' => $iosample_ref->{disk_device}); - if ($field_name =~ /(.*)(\/s|util)$/) { - $desc{'class'} = "throughput"; - my $oper = $1; - if ($oper =~ /^(w|r|d|f)(.*)$/) { - $names{'cmd'} = $1; - $desc{'type'} = $2; - s/^r$/read/, s/^d$/discard/, s/^w$/write/, s/^f$/flush/ for $names{'cmd'}; - s/^$/operations-sec/, s/^rqm$/operations-merged-sec/, s/^kB$/kB-sec/, s/^qm$/request-merges-sec/ for $desc{'type'}; - } elsif ($field_name =~ /^util$/) { - $desc{'type'} = "percent-utilization"; - } - } else { - $desc{'class'} = "count"; - if ($field_name =~ /^(w|r|d|f)(.+)$/) { - $names{'cmd'} = $1; - $desc{'type'} = $2; - s/^r$/read/, s/^d$/discard/, s/^w$/write/, s/^f$/flush/ for $names{'cmd'}; - s/^rqm$/percent-merged/, s/^_await$/avg-service-time-ms/, s/^areq-sz$/avg-req-size-kB/ for $desc{'type'}; - } else { - if ($field_name =~ /^aqu-sz$/) { - $desc{'type'} = "avg-queue-length"; - } - } - } - if (defined $desc{'type'} and defined $sample{'end'} and defined $sample{'value'}) { - log_sample("iostat", \%desc, \%names, \%sample) unless ($field_name eq "disk_device"); - } - } - } - } - close($fh); - finish_samples; -} -my @pidstat_files = grep(/^pidstat-stdout.txt(\.xz){0,1}$/, @files); -if (scalar @pidstat_files > 1) { - printf "ERROR: there should never be more than one pidstat file\n%s\n", @pidstat_files; -} -if (scalar @pidstat_files == 0) { - printf "WARNING: there is no pidstat file\n"; -} -if (scalar @pidstat_files == 1) { - my $log_file = $pidstat_files[0]; - printf "Post-processing for pidstat started\n"; - - # Set to 0 to include all PIDs regardless of activity - my $skip_zero_pids = 1; - - my $pidstat_data_regex = qr/^(\d+):(\d+):(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\d+)\s+(.+?)\s*$/; - - # Pass 1: find PIDs with any non-zero values - my %active_pids; - if ($skip_zero_pids) { - (my $rc1, my $fh1) = open_read_text_file($log_file); - if ($rc1 > 0 or ! defined $fh1) { - printf "sysstat-post-process: open_read_text_file() failed on %s for pass 1\n", $log_file; - } else { - while (<$fh1>) { - chomp; - if (/$pidstat_data_regex/) { - my $pid = $5; - my $usr = $6; - my $system = $7; - my $guest = $8; - my $wait = $9; - my $cpu = $10; - if ($usr > 0 || $system > 0 || $guest > 0 || $wait > 0 || $cpu > 0) { - $active_pids{$pid} = 1; - } - } - } - close($fh1); - printf "pidstat pass 1: found %d active PIDs out of total\n", scalar keys %active_pids; - } - } - - # Pass 2: process data and emit metrics - (my $rc, my $fh) = open_read_text_file($log_file); - if ($rc > 0 or ! defined $fh) { - printf "sysstat-post-process: open_read_text_file() failed on %s\n", $log_file; - exit 0; - } - my $ymd_timestamp_ms; - my $prev_hms_ms; - while (<$fh>) { - chomp; - if (/^Linux\s\S+\s\S+\s+(\d+-\d+-\d+)\s+\S+\s+\S+/) { - my $ymd = $1; - $ymd_timestamp_ms = `date +%s%N -d $ymd -u` / 1000000; - } - if (/$pidstat_data_regex/) { - my $hour = $1; - my $min = $2; - my $sec = $3; - my $pid = $5; - my $usr = $6; - my $system = $7; - my $guest = $8; - my $wait = $9; - my $cpu = $10; - my $command = $12; - - if ($skip_zero_pids && ! exists $active_pids{$pid}) { - next; - } - - my $hms_ms = get_hms_ms($hour, $min, $sec); - if (defined $prev_hms_ms && $hms_ms < $prev_hms_ms) { - $ymd_timestamp_ms = advance_ymd($ymd_timestamp_ms); - } - $prev_hms_ms = $hms_ms; - my $time_ms = $ymd_timestamp_ms + $hms_ms; - - my %desc = ('source' => 'pidstat', 'class' => 'throughput'); - - my %fields = ('usr' => $usr, 'system' => $system, 'guest' => $guest, 'wait' => $wait); - for my $field_name (keys %fields) { - my %names = ('cmd' => $command, 'pid' => $pid, 'type' => $field_name); - if ($field_name eq 'wait') { - $desc{'type'} = 'NonBusy-CPU'; - } else { - $desc{'type'} = 'Busy-CPU'; - } - my %sample = ('end' => $time_ms, 'value' => $fields{$field_name}); - log_sample("pidstat", \%desc, \%names, \%sample); - } - } - } - close($fh); - finish_samples; - printf "Post-processing for pidstat complete\n"; -} - -print "All sysstat post-processing is complete\n"; -while (wait() > -1) {} -closedir $dh; diff --git a/sysstat-post-process.py b/sysstat-post-process.py new file mode 100755 index 0000000..52cabbf --- /dev/null +++ b/sysstat-post-process.py @@ -0,0 +1,584 @@ +#!/usr/bin/env python3 +# -*- mode: python; indent-tabs-mode: nil; python-indent-level: 4 -*- +# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python + +import json +import os +import re +import subprocess +import sys +import threading +from pathlib import Path + +TOOLBOX_HOME = os.environ.get("TOOLBOX_HOME") +if TOOLBOX_HOME: + sys.path.append(str(Path(TOOLBOX_HOME) / "python")) + +from toolbox.cdm_metrics import CDMMetrics +from toolbox.fileio import open_read_text_file +from toolbox.system_cpu_topology import build_cpu_topology, get_cpu_topology + + +def build_netdev_types(): + netdev_types = {} + netdev_file = "netdev-types.txt" + if os.path.isfile(netdev_file): + try: + fh, _ = open_read_text_file(netdev_file) + for line in fh: + line = line.strip() + m = re.match(r'^(\S+)\s+\.\./\.\./devices/(\S+)$', line) + if m: + netdev_types[m.group(1)] = m.group(2) + fh.close() + except FileNotFoundError: + pass + return netdev_types + + +def get_netdev_type(name, netdev_types): + if name in netdev_types: + if netdev_types[name].startswith("pci"): + return "physical" + elif netdev_types[name].startswith("virtual"): + return "virtual" + else: + return "unknown" + return "unknown" + + +def get_hms_ms(hour, minute, sec): + return 1000 * (int(hour) * 3600 + int(minute) * 60 + int(sec)) + + +def advance_ymd(ymd_ms): + return ymd_ms + (1000 * 60 * 60 * 24) + + +def ymd_to_epoch_ms(ymd): + result = subprocess.run( + ["date", "+%s%N", "-d", ymd, "-u"], + capture_output=True, text=True + ) + return int(result.stdout.strip()) // 1000000 + + + +def process_sar(source, log_file): + print(f"Post-processing for {source} started") + metrics = CDMMetrics() + netdev_types = build_netdev_types() + ymd_timestamp_ms = None + scan_mode = "" + hms_ms = None + prev_hms_ms = None + no_names = {} + desc = {"class": "throughput", "source": source} + + try: + fh, _ = open_read_text_file(log_file) + except FileNotFoundError: + print(f"ERROR: could not open {log_file}") + return + + for line in fh: + line = line.rstrip("\n") + + m = re.match(r'^Linux\s\S+\s\S+\s+(\d+-\d+-\d+)\s+\S+\s+\S+', line) + if m: + ymd_timestamp_ms = ymd_to_epoch_ms(m.group(1)) + continue + + if source == "sar-mem": + if re.search(r'pgpgin/s\s+pgpgout/s\s+fault/s\s+majflt/s\s+pgfree/s\s+pgscank/s\s+pgscand/s\s+pgsteal/s\s+%vmeff$', line): + scan_mode = "paging" + elif re.search(r'pswpin/s\s+pswpout/s$', line): + scan_mode = "swapping" + elif re.search(r'%smem-10\s+%smem-60\s+%smem-300\s+%smem\s+%fmem-10\s+%fmem-60\s+%fmem-300\s+%fmem$', line): + scan_mode = "memory-starved" + elif re.search(r'kbmemfree\s+kbavail\s+kbmemused\s+%memused\s+kbbuffers\s+kbcached\s+kbcommit\s+%commit\s+kbactive\s+kbinact\s+kbdirty$', line): + scan_mode = "memory-utilization" + elif scan_mode == "paging": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + for metric_type, val in [ + ("KB-Paged-in-sec", m.group(4)), + ("KB-Paged-out-sec", m.group(5)), + ("Pages-freed-sec", m.group(8)), + ("kswapd-scanned-pages-sec", m.group(9)), + ("scanned-pages-sec", m.group(10)), + ("reclaimed-pages-sec", m.group(11)), + ("VM-Efficiency", m.group(12)), + ]: + desc["type"] = metric_type + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + faults_minor = float(m.group(6)) - float(m.group(7)) + faults_major = float(m.group(7)) + desc["type"] = "Page-faults-sec" + for fault_type, val in [("minor", faults_minor), ("major", faults_major)]: + sample["value"] = val + metrics.log_sample(source, desc, {"type": fault_type}, sample) + elif line == "": + scan_mode = "" + elif scan_mode == "swapping": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + desc["type"] = "Pages-swapped-in-sec" + sample["value"] = float(m.group(4)) + metrics.log_sample(source, desc, no_names, sample) + desc["type"] = "Pages-swapped-out-sec" + sample["value"] = float(m.group(5)) + metrics.log_sample(source, desc, no_names, sample) + elif line == "": + scan_mode = "" + elif scan_mode == "memory-starved": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + for tw, val in [("010s", m.group(4)), ("060s", m.group(5)), ("300s", m.group(6)), ("last_interval", m.group(7))]: + desc["type"] = f"%-Time-Tasks-Waiting-on-Memory-{tw}" + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + for tw, val in [("010s", m.group(8)), ("060s", m.group(9)), ("300s", m.group(10)), ("last_interval", m.group(11))]: + desc["type"] = f"%-Time-Non-Idle-Tasks-Stalled-on-Memory-{tw}" + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + elif line == "": + scan_mode = "" + elif scan_mode == "memory-utilization": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+)\s+(\d+)\s+(\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + for metric_type, val in [ + ("Memory-Free-KB", m.group(4)), + ("Memory-Available-KB", m.group(5)), + ("Memory-Used-KB", m.group(6)), + ("Memory-Used-Percent", m.group(7)), + ("Memory-Buffers-KB", m.group(8)), + ("Memory-Cached-KB", m.group(9)), + ("Memory-Commit-KB", m.group(10)), + ("Memory-Commit-Percent", m.group(11)), + ("Memory-Active-KB", m.group(12)), + ("Memory-Inactive-KB", m.group(13)), + ("Memory-Dirty-KB", m.group(14)), + ]: + desc["type"] = metric_type + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + elif line == "": + scan_mode = "" + + elif source == "sar-io": + if re.search(r'%sio-10\s+%sio-60\s+%sio-300\s+%sio\s+%fio-10\s+%fio-60\s+%fio-300\s+%fio$', line): + scan_mode = "io-starved" + elif scan_mode == "io-starved": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + for tw, val in [("010s", m.group(4)), ("060s", m.group(5)), ("300s", m.group(6)), ("last_interval", m.group(7))]: + desc["type"] = f"%-Time-Tasks-Lost-Waiting-on-IO-{tw}" + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + for tw, val in [("010s", m.group(8)), ("060s", m.group(9)), ("300s", m.group(10)), ("last_interval", m.group(11))]: + desc["type"] = f"%-Time-Tasks-Stalled-Waiting-on-IO-{tw}" + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + elif line == "": + scan_mode = "" + + elif source == "sar-tasks": + if re.search(r'proc/s\s+cswch/s$', line): + scan_mode = "task" + elif scan_mode == "task": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + desc["type"] = "Processes-created-sec" + sample["value"] = float(m.group(4)) + metrics.log_sample(source, desc, no_names, sample) + desc["type"] = "Context-switches-sec" + sample["value"] = float(m.group(5)) + metrics.log_sample(source, desc, no_names, sample) + elif line == "": + scan_mode = "" + + elif source == "sar-scheduler": + if re.search(r'%scpu-10\s+%scpu-60\s+%scpu-300\s+%scpu$', line): + scan_mode = "cpu-starved" + elif re.search(r'runq-sz\s+plist-sz\s+ldavg-1\s+ldavg-5\s+ldavg-15\s+blocked$', line): + scan_mode = "task-lists" + elif scan_mode == "cpu-starved": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + for tw, val in [("010s", m.group(4)), ("060s", m.group(5)), ("300s", m.group(6)), ("last_interval", m.group(7))]: + desc["type"] = f"%-Time-Tasks-CPU-Starved-{tw}" + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + elif line == "": + scan_mode = "" + elif scan_mode == "task-lists": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\d+)\s+(\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+)$', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + desc["type"] = "Run-Queue-Length" + sample["value"] = float(m.group(4)) + metrics.log_sample(source, desc, no_names, sample) + desc["type"] = "Process-List-Size" + sample["value"] = float(m.group(5)) + metrics.log_sample(source, desc, no_names, sample) + for tw, val in [("01m", m.group(6)), ("05m", m.group(7)), ("15m", m.group(8))]: + desc["type"] = f"Load-Average-{tw}" + sample["value"] = float(val) + metrics.log_sample(source, desc, no_names, sample) + desc["type"] = "IO-Blocked-Tasks" + sample["value"] = float(m.group(9)) + metrics.log_sample(source, desc, no_names, sample) + elif line == "": + scan_mode = "" + + elif source == "sar-net": + if re.search(r'IFACE\s+rxpck/s\s+txpck/s\s+rxkB/s\s+txkB/s\s+rxcmp/s\s+txcmp/s\s+rxmcst/s\s+%ifutil$', line): + scan_mode = "net" + elif scan_mode == "net": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\S+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + dev = m.group(4) + rxkB = float(m.group(7)) + txkB = float(m.group(8)) + rxpack = float(m.group(5)) + txpack = float(m.group(6)) + desc["type"] = "L2-Gbps" + for direction, kB in [("rx", rxkB), ("tx", txkB)]: + names = {"dev": dev, "direction": direction, "type": get_netdev_type(dev, netdev_types)} + sample["value"] = kB / 1000000 * 8 + metrics.log_sample(source, desc, names, sample) + desc["type"] = "packets-sec" + for direction, pkt in [("rx", rxpack), ("tx", txpack)]: + names = {"dev": dev, "direction": direction, "type": get_netdev_type(dev, netdev_types)} + sample["value"] = pkt + metrics.log_sample(source, desc, names, sample) + elif line == "": + scan_mode = "" + + if re.search(r'IFACE\s+rxerr/s\s+txerr/s\s+coll/s\s+rxdrop/s\s+txdrop/s\s+txcarr/s\s+rxfram/s\s+rxfifo/s\s+txfifo/s$', line): + scan_mode = "net-error" + elif scan_mode == "net-error": + m = re.match(r'(\d+):(\d+):(\d+)\s+(\S+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)\s+(\d+\.\d+)', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample = {"end": ymd_timestamp_ms + hms_ms} + dev = m.group(4) + desc["type"] = "errors-sec" + errors = { + "tx": {"collision": float(m.group(7)), "drop": float(m.group(9)), "carrier": float(m.group(10)), "fifo-overrun": float(m.group(13))}, + "rx": {"drop": float(m.group(8)), "frame-alignment": float(m.group(11)), "fifo-overrun": float(m.group(12))}, + } + for direction in errors: + for flavor in errors[direction]: + sample["value"] = errors[direction][flavor] + names = {"dev": dev, "direction": direction, "type": get_netdev_type(dev, netdev_types), "error": flavor} + metrics.log_sample(source, desc, names, sample) + elif line == "": + scan_mode = "" + + prev_hms_ms = hms_ms + + fh.close() + metrics.finish_samples() + print(f"Post-processing for {source} complete") + + +def process_mpstat(fork_idx, num_forks, log_file, cpu_topo): + print(f"Post-processing for mpstat-{fork_idx} started") + metrics = CDMMetrics() + ymd_timestamp_ms = None + hms_ms = None + prev_hms_ms = None + desc = {"class": "throughput", "source": "mpstat"} + sample = {} + + try: + fh, _ = open_read_text_file(log_file) + except FileNotFoundError: + print(f"ERROR: could not open {log_file}") + return + + for line in fh: + line = line.rstrip("\n") + + m = re.search(r'"date": "(\d+-\d+-\d+)"', line) + if m: + ymd_timestamp_ms = ymd_to_epoch_ms(m.group(1)) + continue + + m = re.search(r'"timestamp": "(\d+):(\d+):(\d+)"', line) + if m: + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and prev_hms_ms > hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + sample["end"] = ymd_timestamp_ms + hms_ms + prev_hms_ms = hms_ms + continue + + m = re.match(r'\s*(\{"cpu": "\d+".*\}),?$', line) + if m: + try: + cpu_entry = json.loads(m.group(1)) + except json.JSONDecodeError: + continue + cpu_num = int(cpu_entry["cpu"]) + if cpu_num % num_forks != fork_idx: + continue + package, die, core, thread = get_cpu_topology(cpu_num, cpu_topo) + for cpu_type in cpu_entry: + if cpu_type == "cpu": + continue + names = {"package": package, "die": die, "core": core, "thread": thread, "num": cpu_num, "type": cpu_type} + if cpu_type in ("idle", "iowait", "steal"): + desc["type"] = "NonBusy-CPU" + else: + desc["type"] = "Busy-CPU" + sample["value"] = cpu_entry[cpu_type] / 100 + metrics.log_sample(f"mpstat-{fork_idx}", desc, names, sample) + + prev_hms_ms = hms_ms + + fh.close() + metrics.finish_samples() + print(f"Post-processing for mpstat-{fork_idx} complete") + + +def process_iostat(log_file): + print("Post-processing for iostat started") + metrics = CDMMetrics() + + try: + fh, _ = open_read_text_file(log_file) + except FileNotFoundError: + print(f"ERROR: could not open {log_file}") + return + + time_ms = None + from datetime import datetime, timezone + + for line in fh: + line = line.rstrip("\n") + m = re.search(r'\s*"timestamp"\:\s*"(.*?)"', line) + if m: + dt = datetime.strptime(m.group(1), "%Y-%m-%dT%H:%M:%S%z") + time_ms = int(dt.timestamp() * 1000) + continue + + m = re.search(r'\s+({.*disk_device.*})', line) + if m: + try: + io_sample = json.loads(m.group(1)) + except json.JSONDecodeError: + continue + desc = {"source": "iostat"} + for field_name in io_sample: + if field_name == "disk_device": + continue + sample = {"value": io_sample[field_name], "end": time_ms} + names = {"dev": io_sample["disk_device"]} + + rate_m = re.match(r'(.*)(\/s|util)$', field_name) + if rate_m: + desc["class"] = "throughput" + oper = rate_m.group(1) + cmd_m = re.match(r'^(w|r|d|f)(.*)$', oper) + if cmd_m: + cmd_map = {"r": "read", "w": "write", "d": "discard", "f": "flush"} + type_map = {"": "operations-sec", "rqm": "operations-merged-sec", "kB": "kB-sec", "qm": "request-merges-sec"} + names["cmd"] = cmd_map.get(cmd_m.group(1), cmd_m.group(1)) + desc["type"] = type_map.get(cmd_m.group(2), cmd_m.group(2)) + elif field_name == "util": + desc["type"] = "percent-utilization" + else: + desc["class"] = "count" + cmd_m = re.match(r'^(w|r|d|f)(.+)$', field_name) + if cmd_m: + cmd_map = {"r": "read", "w": "write", "d": "discard", "f": "flush"} + type_map = {"rqm": "percent-merged", "_await": "avg-service-time-ms", "areq-sz": "avg-req-size-kB"} + names["cmd"] = cmd_map.get(cmd_m.group(1), cmd_m.group(1)) + desc["type"] = type_map.get(cmd_m.group(2), cmd_m.group(2)) + elif field_name == "aqu-sz": + desc["type"] = "avg-queue-length" + + if "type" in desc and time_ms is not None and sample["value"] is not None: + metrics.log_sample("iostat", desc, names, sample) + + fh.close() + metrics.finish_samples() + print("Post-processing for iostat complete") + + +def process_pidstat(log_file): + print("Post-processing for pidstat started") + metrics = CDMMetrics() + skip_zero_pids = True + data_re = re.compile( + r'^(\d+):(\d+):(\d+)\s+(\d+)\s+(\d+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\d+)\s+(.+?)\s*$' + ) + + active_pids = set() + if skip_zero_pids: + try: + fh1, _ = open_read_text_file(log_file) + for line in fh1: + m = data_re.match(line.rstrip("\n")) + if m: + pid = m.group(5) + if any(float(m.group(i)) > 0 for i in (6, 7, 8, 9, 10)): + active_pids.add(pid) + fh1.close() + print(f"pidstat pass 1: found {len(active_pids)} active PIDs out of total") + except FileNotFoundError: + print(f"ERROR: could not open {log_file} for pass 1") + return + + try: + fh, _ = open_read_text_file(log_file) + except FileNotFoundError: + print(f"ERROR: could not open {log_file}") + return + + ymd_timestamp_ms = None + prev_hms_ms = None + + for line in fh: + line = line.rstrip("\n") + m = re.match(r'^Linux\s\S+\s\S+\s+(\d+-\d+-\d+)\s+\S+\s+\S+', line) + if m: + ymd_timestamp_ms = ymd_to_epoch_ms(m.group(1)) + continue + + m = data_re.match(line) + if m: + pid = m.group(5) + if skip_zero_pids and pid not in active_pids: + continue + + hms_ms = get_hms_ms(m.group(1), m.group(2), m.group(3)) + if prev_hms_ms is not None and hms_ms < prev_hms_ms: + ymd_timestamp_ms = advance_ymd(ymd_timestamp_ms) + prev_hms_ms = hms_ms + time_ms = ymd_timestamp_ms + hms_ms + + command = m.group(12) + desc = {"source": "pidstat", "class": "throughput"} + fields = {"usr": float(m.group(6)), "system": float(m.group(7)), "guest": float(m.group(8)), "wait": float(m.group(9))} + + for field_name, val in fields.items(): + names = {"cmd": command, "pid": pid, "type": field_name} + desc["type"] = "NonBusy-CPU" if field_name == "wait" else "Busy-CPU" + sample = {"end": time_ms, "value": val} + metrics.log_sample("pidstat", desc, names, sample) + + fh.close() + metrics.finish_samples() + print("Post-processing for pidstat complete") + + +def main(): + print("sysstat-post-process") + + files = sorted(os.listdir(".")) + print(f"files to process:\n {' '.join(files)}") + threads = [] + + # SAR processing + sar_files = [f for f in files if re.match(r'^sar-stdout\.txt(\.xz)?$', f)] + if len(sar_files) > 1: + print(f"ERROR: there should never be more than one sar file: {sar_files}") + elif len(sar_files) == 0: + print("ERROR: there is no sar file") + else: + log_file = sar_files[0] + print(f"Found {log_file}") + for source in ("sar-mem", "sar-scheduler", "sar-io", "sar-tasks", "sar-net"): + t = threading.Thread(target=process_sar, args=(source, log_file)) + t.start() + threads.append(t) + print(f"Waiting for {len(threads)} sar post-processing threads") + for t in threads: + t.join() + threads = [] + + # mpstat processing + mpstat_files = [f for f in files if re.match(r'^mpstat\.json(\.xz)?$', f)] + if len(mpstat_files) > 1: + print(f"ERROR: there should never be more than one mpstat file: {mpstat_files}") + elif len(mpstat_files) == 0: + print("ERROR: there is no mpstat file") + else: + log_file = mpstat_files[0] + print(f"Found {log_file}") + cpu_topo = build_cpu_topology("sys/devices/system/cpu") + num_forks = 8 + for i in range(num_forks): + t = threading.Thread(target=process_mpstat, args=(i, num_forks, log_file, cpu_topo)) + t.start() + threads.append(t) + print(f"Waiting for {len(threads)} mpstat post-processing threads") + for t in threads: + t.join() + threads = [] + + # iostat processing (sequential) + iostat_files = [f for f in files if re.match(r'^iostat\.json(\.xz)?$', f)] + if len(iostat_files) > 1: + print(f"ERROR: there should never be more than one iostat file: {iostat_files}") + elif len(iostat_files) == 0: + print("ERROR: there is no iostat file") + else: + process_iostat(iostat_files[0]) + + # pidstat processing (sequential) + pidstat_files = [f for f in files if re.match(r'^pidstat-stdout\.txt(\.xz)?$', f)] + if len(pidstat_files) > 1: + print(f"ERROR: there should never be more than one pidstat file: {pidstat_files}") + elif len(pidstat_files) == 0: + print("WARNING: there is no pidstat file") + else: + process_pidstat(pidstat_files[0]) + + print("All sysstat post-processing is complete") + + +if __name__ == "__main__": + main()