From dc8ca0682ffd86cf2837dc30ac793c3c595709a1 Mon Sep 17 00:00:00 2001 From: Victor Boglea Date: Fri, 5 Jun 2026 13:21:40 +0200 Subject: [PATCH] feat: add missing redis-backed collectors --- README.md | 66 +++- cmd/sonic-exporter/main.go | 20 + fixtures/test/appl_db_data.json | 26 ++ fixtures/test/state_db_data.json | 119 ++++++ internal/collector/collector_test.go | 227 +++++++++++ internal/collector/lldp_collector.go | 20 +- .../collector/platform_health_collector.go | 349 +++++++++++++++++ internal/collector/redis_value_helpers.go | 78 ++++ internal/collector/routing_collector.go | 286 ++++++++++++++ internal/collector/switch_collector.go | 252 ++++++++++++ internal/collector/thermal_collector.go | 221 +++++++++++ internal/collector/transceiver_collector.go | 358 ++++++++++++++++++ 12 files changed, 2018 insertions(+), 4 deletions(-) create mode 100644 internal/collector/platform_health_collector.go create mode 100644 internal/collector/redis_value_helpers.go create mode 100644 internal/collector/routing_collector.go create mode 100644 internal/collector/switch_collector.go create mode 100644 internal/collector/thermal_collector.go create mode 100644 internal/collector/transceiver_collector.go diff --git a/README.md b/README.md index c76eb6e..77252e9 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ flowchart LR subgraph sonic-exporter M[cmd/sonic-exporter/main.go] - COL[Collectors\ninterface, hw, crm, queue, lldp, vlan, lag, fdb\nsystem*, docker*, frr*] + COL[Collectors\ninterface, hw, crm, queue, lldp, vlan, lag, fdb\nswitch, thermal, transceiver\nrouting*, platform*, system*, docker*, frr*] CACHE[(In-memory metric cache)] NODE[node_exporter subset\nloadavg,cpu,diskstats,filesystem,meminfo,time,stat] end @@ -81,6 +81,11 @@ For a deeper breakdown, see `docs/architecture.md`. | LLDP | LLDP neighbors from Redis | Enabled | | VLAN | VLAN and VLAN member state | Enabled | | LAG | PortChannel and member state | Enabled | +| Switch | Switch-level Redis state from `APPL_DB` `SWITCH_TABLE` | Enabled | +| Thermal | ASIC and SFP max temperatures from `STATE_DB` | Enabled | +| Transceiver | Transceiver status, flags, and thresholds from `STATE_DB` | Enabled | +| Routing | Route and neighbor summaries from `APPL_DB` | Disabled (`ROUTING_ENABLED=false`) | +| Platform Health | Process, storage, and system health metrics from `STATE_DB` | Disabled (`PLATFORM_HEALTH_ENABLED=false`) | | FDB | FDB summary from ASIC DB | Disabled (`FDB_ENABLED=false`) | | System (experimental) | Switch identity, software metadata, uptime | Disabled (`SYSTEM_ENABLED=false`) | | Docker (experimental) | Container runtime metrics from `STATE_DB` | Disabled (`DOCKER_ENABLED=false`) | @@ -140,7 +145,7 @@ curl localhost:9101/metrics ### Docker deployment for SONiC -Optional collectors stay opt in. Keep `SYSTEM_ENABLED=false`, `DOCKER_ENABLED=false`, and `FRR_ENABLED=false` unless you need them. +Optional collectors stay opt in. Keep `ROUTING_ENABLED=false`, `PLATFORM_HEALTH_ENABLED=false`, `SYSTEM_ENABLED=false`, `DOCKER_ENABLED=false`, and `FRR_ENABLED=false` unless you need them. #### Recommended online switch flow @@ -191,6 +196,8 @@ REDIS_NETWORK=tcp REDIS_PASSWORD= SONIC_DISABLED_METRICS= FDB_ENABLED=false +ROUTING_ENABLED=false +PLATFORM_HEALTH_ENABLED=false SYSTEM_ENABLED=false DOCKER_ENABLED=false FRR_ENABLED=false @@ -212,6 +219,8 @@ sudo docker run -d \ -e REDIS_NETWORK=tcp \ -e SONIC_DISABLED_METRICS= \ -e FDB_ENABLED=false \ + -e ROUTING_ENABLED=false \ + -e PLATFORM_HEALTH_ENABLED=false \ -e SYSTEM_ENABLED=false \ -e DOCKER_ENABLED=false \ -e FRR_ENABLED=false \ @@ -244,7 +253,7 @@ Restart=always RestartSec=30 ExecStartPre=/bin/sh -c 'until /usr/bin/redis-cli -h 127.0.0.1 -p 6379 ping | /bin/grep -q PONG; do sleep 2; done' ExecStartPre=-/usr/bin/docker rm -f sonic-exporter -ExecStart=/usr/bin/docker run --name sonic-exporter --label app=sonic-exporter --label managed-by=systemd --restart no --network host -e REDIS_ADDRESS=127.0.0.1:6379 -e REDIS_NETWORK=tcp -e REDIS_PASSWORD= -e SONIC_DISABLED_METRICS= -e FDB_ENABLED=false -e SYSTEM_ENABLED=false -e DOCKER_ENABLED=false -e FRR_ENABLED=false ghcr.io/rokernel/sonic-exporter:v0.1.1 +ExecStart=/usr/bin/docker run --name sonic-exporter --label app=sonic-exporter --label managed-by=systemd --restart no --network host -e REDIS_ADDRESS=127.0.0.1:6379 -e REDIS_NETWORK=tcp -e REDIS_PASSWORD= -e SONIC_DISABLED_METRICS= -e FDB_ENABLED=false -e ROUTING_ENABLED=false -e PLATFORM_HEALTH_ENABLED=false -e SYSTEM_ENABLED=false -e DOCKER_ENABLED=false -e FRR_ENABLED=false ghcr.io/rokernel/sonic-exporter:v0.1.1 ExecStop=-/usr/bin/docker stop sonic-exporter ExecStopPost=-/usr/bin/docker rm -f sonic-exporter @@ -477,6 +486,52 @@ Be careful with broad patterns. A wide match can also hide health metrics such a | `FDB_MAX_PORTS` | Max per-port FDB series exported | `1024` | | `FDB_MAX_VLANS` | Max per-VLAN FDB series exported | `4096` | +### Switch collector + +| Variable | Description | Default | +|---|---|---| +| `SWITCH_ENABLED` | Enable switch collector | `true` | +| `SWITCH_REFRESH_INTERVAL` | Cache refresh interval | `60s` | +| `SWITCH_TIMEOUT` | Timeout for one refresh cycle | `2s` | +| `SWITCH_MAX_ENTRIES` | Max switch table entries exported per refresh | `16` | + +### Thermal collector + +| Variable | Description | Default | +|---|---|---| +| `THERMAL_ENABLED` | Enable thermal collector | `true` | +| `THERMAL_REFRESH_INTERVAL` | Cache refresh interval | `60s` | +| `THERMAL_TIMEOUT` | Timeout for one refresh cycle | `2s` | + +### Transceiver collector + +| Variable | Description | Default | +|---|---|---| +| `TRANSCEIVER_ENABLED` | Enable transceiver collector | `true` | +| `TRANSCEIVER_REFRESH_INTERVAL` | Cache refresh interval | `60s` | +| `TRANSCEIVER_TIMEOUT` | Timeout for one refresh cycle | `2s` | +| `TRANSCEIVER_MAX_PORTS` | Max transceiver ports exported per refresh | `1024` | + +### Routing collector + +| Variable | Description | Default | +|---|---|---| +| `ROUTING_ENABLED` | Enable routing collector | `false` | +| `ROUTING_REFRESH_INTERVAL` | Cache refresh interval | `60s` | +| `ROUTING_TIMEOUT` | Timeout for one refresh cycle | `2s` | +| `ROUTING_MAX_NEIGHBORS` | Max neighbor entries exported per refresh | `50000` | +| `ROUTING_MAX_ROUTES` | Max route entries exported per refresh | `200000` | + +### Platform health collector + +| Variable | Description | Default | +|---|---|---| +| `PLATFORM_HEALTH_ENABLED` | Enable platform health collector | `false` | +| `PLATFORM_HEALTH_REFRESH_INTERVAL` | Cache refresh interval | `60s` | +| `PLATFORM_HEALTH_TIMEOUT` | Timeout for one refresh cycle | `2s` | +| `PLATFORM_HEALTH_MAX_PROCESSES` | Max process entries exported per refresh | `512` | +| `PLATFORM_HEALTH_MAX_STORAGE_DEVICES` | Max storage devices exported per refresh | `128` | + ### System collector (experimental) | Variable | Description | Default | @@ -714,7 +769,12 @@ SONIC_DISABLED_METRICS= LLDP_ENABLED=true VLAN_ENABLED=true LAG_ENABLED=true +SWITCH_ENABLED=true +THERMAL_ENABLED=true +TRANSCEIVER_ENABLED=true FDB_ENABLED=false +ROUTING_ENABLED=false +PLATFORM_HEALTH_ENABLED=false SYSTEM_ENABLED=false DOCKER_ENABLED=false FRR_ENABLED=false diff --git a/cmd/sonic-exporter/main.go b/cmd/sonic-exporter/main.go index 5551bf3..910e2ab 100644 --- a/cmd/sonic-exporter/main.go +++ b/cmd/sonic-exporter/main.go @@ -61,6 +61,11 @@ func main() { vlanCollector := collector.NewVlanCollector(logger, metricFilter) lagCollector := collector.NewLagCollector(logger, metricFilter) fdbCollector := collector.NewFdbCollector(logger, metricFilter) + routingCollector := collector.NewRoutingCollector(logger, metricFilter) + switchCollector := collector.NewSwitchCollector(logger, metricFilter) + thermalCollector := collector.NewThermalCollector(logger, metricFilter) + transceiverCollector := collector.NewTransceiverCollector(logger, metricFilter) + platformHealthCollector := collector.NewPlatformHealthCollector(logger, metricFilter) systemCollector := collector.NewSystemCollector(logger, metricFilter) dockerCollector := collector.NewDockerCollector(logger, metricFilter) frrCollector := collector.NewFrrCollector(logger) @@ -80,6 +85,21 @@ func main() { if fdbCollector.IsEnabled() { prometheus.MustRegister(fdbCollector) } + if routingCollector.IsEnabled() { + prometheus.MustRegister(routingCollector) + } + if switchCollector.IsEnabled() { + prometheus.MustRegister(switchCollector) + } + if thermalCollector.IsEnabled() { + prometheus.MustRegister(thermalCollector) + } + if transceiverCollector.IsEnabled() { + prometheus.MustRegister(transceiverCollector) + } + if platformHealthCollector.IsEnabled() { + prometheus.MustRegister(platformHealthCollector) + } if systemCollector.IsEnabled() { prometheus.MustRegister(systemCollector) } diff --git a/fixtures/test/appl_db_data.json b/fixtures/test/appl_db_data.json index bb7e6e5..78b9c48 100644 --- a/fixtures/test/appl_db_data.json +++ b/fixtures/test/appl_db_data.json @@ -82,6 +82,32 @@ }, "LAG_MEMBER_TABLE:PortChannel2:Ethernet92": { "status": "enabled" + }, + "NEIGH_TABLE:eth0:192.0.2.1": { + "neigh": "00:11:22:33:44:55", + "family": "IPv4" + }, + "NEIGH_TABLE:Ethernet0:2001:db8::1": { + "neigh": "00:11:22:33:44:66", + "family": "IPv6" + }, + "ROUTE_TABLE:192.0.2.0/24": { + "protocol": "kernel", + "nexthop": "0.0.0.0", + "ifname": "eth0" + }, + "ROUTE_TABLE:2001:db8::/64": { + "protocol": "static", + "nexthop": "::", + "ifname": "Ethernet0" + }, + "SWITCH_TABLE:switch": { + "ecmp_hash_offset": "0", + "ecmp_hash_seed": "10", + "fdb_aging_time": "600", + "lag_hash_offset": "1", + "lag_hash_seed": "20", + "ordered_ecmp": "true" } } } diff --git a/fixtures/test/state_db_data.json b/fixtures/test/state_db_data.json index c263ba2..7ca2610 100644 --- a/fixtures/test/state_db_data.json +++ b/fixtures/test/state_db_data.json @@ -103,6 +103,125 @@ "model": "Model-X", "revision": "A01" }, + "ASIC_TEMPERATURE_INFO": { + "temperature_0": "45", + "temperature_1": "47", + "maximum_temperature": "50", + "average_temperature": "46" + }, + "TEMPERATURE_SFP_MAX": { + "maximum_temperature": "31", + "timestamp": "20260605 10:46:20" + }, + "TRANSCEIVER_STATUS|Ethernet0": { + "module_state": "ModuleReady", + "module_fault_cause": "No Fault detected", + "tx1OutputStatus": "True", + "tx2OutputStatus": "False", + "rx1OutputStatusHostlane": "True", + "rx2OutputStatusHostlane": "True", + "tx1disable": "False", + "tx2disable": "True", + "last_update_time": "Fri Jun 05 10:44:56 2026" + }, + "TRANSCEIVER_STATUS_FLAG|Ethernet0": { + "tx1fault": "False", + "tx2fault": "True", + "rx1los": "False", + "rx2los": "True", + "last_update_time": "Fri Jun 05 10:44:56 2026" + }, + "TRANSCEIVER_STATUS_FLAG_CHANGE_COUNT|Ethernet0": { + "tx1fault": "0", + "tx2fault": "1", + "rx1los": "0", + "rx2los": "2" + }, + "TRANSCEIVER_STATUS_FLAG_CLEAR_TIME|Ethernet0": { + "tx1fault": "never", + "tx2fault": "Fri Jun 05 10:44:00 2026", + "rx1los": "never", + "rx2los": "Fri Jun 05 10:45:00 2026" + }, + "TRANSCEIVER_STATUS_FLAG_SET_TIME|Ethernet0": { + "tx1fault": "never", + "tx2fault": "Fri Jun 05 10:40:00 2026", + "rx1los": "never", + "rx2los": "Fri Jun 05 10:41:00 2026" + }, + "TRANSCEIVER_DOM_FLAG|Ethernet0": { + "tempHAlarm": "False", + "tempLAlarm": "False", + "tx1powerHAlarm": "True", + "tx1powerLWarn": "False", + "last_update_time": "Fri Jun 05 10:44:56 2026" + }, + "TRANSCEIVER_DOM_FLAG_CHANGE_COUNT|Ethernet0": { + "tempHAlarm": "0", + "tempLAlarm": "0", + "tx1powerHAlarm": "3", + "tx1powerLWarn": "1" + }, + "TRANSCEIVER_DOM_FLAG_CLEAR_TIME|Ethernet0": { + "tempHAlarm": "never", + "tempLAlarm": "never", + "tx1powerHAlarm": "Fri Jun 05 10:43:00 2026", + "tx1powerLWarn": "Fri Jun 05 10:42:00 2026" + }, + "TRANSCEIVER_DOM_FLAG_SET_TIME|Ethernet0": { + "tempHAlarm": "never", + "tempLAlarm": "never", + "tx1powerHAlarm": "Fri Jun 05 10:41:00 2026", + "tx1powerLWarn": "Fri Jun 05 10:40:00 2026" + }, + "TRANSCEIVER_DOM_THRESHOLD|Ethernet0": { + "temphighalarm": "80.0", + "templowalarm": "-5.0", + "txpowerhighalarm": "6.5", + "txpowerlowwarning": "-4.3", + "last_update_time": "Fri Jun 05 10:44:56 2026" + }, + "PROCESS_STATS|1": { + "CMD": "/sbin/init", + "CPU": "0.0", + "MEM": "0.1", + "PPID": "0", + "STIME": "May29", + "TIME": "0:07:49", + "TT": "None", + "UID": "0" + }, + "PROCESS_STATS|42": { + "CMD": "/usr/bin/orchagent", + "CPU": "1.5", + "MEM": "2.5", + "PPID": "1", + "STIME": "May29", + "TIME": "0:00:30", + "TT": "None", + "UID": "1000" + }, + "PROCESS_STATS|LastUpdateTime": { + "lastupdate": "2026-06-05 10:26:12" + }, + "STORAGE_INFO|sda": { + "device_model": "M.2 (S80) 3ME4", + "serial": "TESTSERIAL001", + "firmware": "L20420", + "health": "100", + "temperature": "32", + "latest_fsio_reads": "18564", + "latest_fsio_writes": "894008", + "disk_io_reads": "9704", + "disk_io_writes": "17085", + "reserved_blocks": "133", + "last_sync_time": "2026-06-05 10:26:12", + "total_fsio_reads": "29562", + "total_fsio_writes": "1821638" + }, + "SYSTEM_HEALTH_INFO": { + "summary": "OK" + }, "DOCKER_STATS|0001": { "NAME": "swss", "CPU%": "1.5", diff --git a/internal/collector/collector_test.go b/internal/collector/collector_test.go index 907cf4b..d44b54c 100644 --- a/internal/collector/collector_test.go +++ b/internal/collector/collector_test.go @@ -64,6 +64,35 @@ func getMetricFamily(t *testing.T, c prometheus.Collector, metricName string) *c return nil } +func metricWithLabelsExists(metricFamily *clientModel.MetricFamily, labels map[string]string, value float64) bool { + if metricFamily == nil { + return false + } + + for _, metric := range metricFamily.Metric { + matches := true + for labelName, labelValue := range labels { + found := false + for _, label := range metric.Label { + if label.GetName() == labelName && label.GetValue() == labelValue { + found = true + break + } + } + if !found { + matches = false + break + } + } + + if matches && metric.GetGauge() != nil && metric.GetGauge().GetValue() == value { + return true + } + } + + return false +} + func hasPsuSlotInMetricFamily(metricFamily *clientModel.MetricFamily, slot string) bool { if metricFamily == nil { return false @@ -193,6 +222,11 @@ func TestMain(m *testing.M) { os.Setenv("VLAN_ENABLED", "true") os.Setenv("LAG_ENABLED", "true") os.Setenv("FDB_ENABLED", "true") + os.Setenv("ROUTING_ENABLED", "true") + os.Setenv("SWITCH_ENABLED", "true") + os.Setenv("THERMAL_ENABLED", "true") + os.Setenv("TRANSCEIVER_ENABLED", "true") + os.Setenv("PLATFORM_HEALTH_ENABLED", "true") os.Setenv("SYSTEM_ENABLED", "true") os.Setenv("DOCKER_ENABLED", "true") os.Setenv("SYSTEM_COMMAND_ENABLED", "false") @@ -215,6 +249,11 @@ func TestMain(m *testing.M) { os.Unsetenv("VLAN_ENABLED") os.Unsetenv("LAG_ENABLED") os.Unsetenv("FDB_ENABLED") + os.Unsetenv("ROUTING_ENABLED") + os.Unsetenv("SWITCH_ENABLED") + os.Unsetenv("THERMAL_ENABLED") + os.Unsetenv("TRANSCEIVER_ENABLED") + os.Unsetenv("PLATFORM_HEALTH_ENABLED") os.Unsetenv("SYSTEM_ENABLED") os.Unsetenv("DOCKER_ENABLED") os.Unsetenv("SYSTEM_COMMAND_ENABLED") @@ -577,6 +616,194 @@ func TestLldpCollector(t *testing.T) { if err := testutil.CollectAndCompare(lldpCollector, strings.NewReader(neighborMetadata+neighborExpected), "sonic_lldp_neighbor_info"); err != nil { t.Errorf("unexpected collecting result:\n%s", err) } + + localChassisMetadata := ` + # HELP sonic_lldp_local_chassis_info Non-numeric data about local LLDP chassis, value is always 1 + # TYPE sonic_lldp_local_chassis_info gauge + ` + + localChassisExpected := ` + sonic_lldp_local_chassis_info{chassis_id="74:86:e2:a4:6c:a5",system_name="net-tor-lab002.lau1"} 1 + ` + + if err := testutil.CollectAndCompare(lldpCollector, strings.NewReader(localChassisMetadata+localChassisExpected), "sonic_lldp_local_chassis_info"); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } +} + +func TestRoutingCollector(t *testing.T) { + promslogConfig := &promslog.Config{} + logger := promslog.New(promslogConfig) + + routingCollector := NewRoutingCollector(logger, NewMetricFilter(logger)) + + problems, err := testutil.CollectAndLint(routingCollector) + if err != nil { + t.Error("metric lint completed with errors") + } + + for _, problem := range problems { + t.Errorf("metric %v has a problem: %v", problem.Metric, problem.Text) + } + + neighborFamily := getMetricFamily(t, routingCollector, "sonic_routing_neighbor_entries") + if !metricWithLabelsExists(neighborFamily, map[string]string{"interface": "eth0", "family": "ipv4"}, 1) { + t.Fatalf("expected ipv4 neighbor count for eth0") + } + if !metricWithLabelsExists(neighborFamily, map[string]string{"interface": "Ethernet0", "family": "ipv6"}, 1) { + t.Fatalf("expected ipv6 neighbor count for Ethernet0") + } + + routeFamily := getMetricFamily(t, routingCollector, "sonic_routing_route_entries") + if !metricWithLabelsExists(routeFamily, map[string]string{"family": "ipv4", "protocol": "kernel"}, 1) { + t.Fatalf("expected ipv4 route count for kernel routes") + } + if !metricWithLabelsExists(routeFamily, map[string]string{"family": "ipv6", "protocol": "static"}, 1) { + t.Fatalf("expected ipv6 route count for static routes") + } +} + +func TestSwitchCollector(t *testing.T) { + promslogConfig := &promslog.Config{} + logger := promslog.New(promslogConfig) + + switchCollector := NewSwitchCollector(logger, NewMetricFilter(logger)) + + problems, err := testutil.CollectAndLint(switchCollector) + if err != nil { + t.Error("metric lint completed with errors") + } + + for _, problem := range problems { + t.Errorf("metric %v has a problem: %v", problem.Metric, problem.Text) + } + + metadata := ` + # HELP sonic_switch_fdb_aging_time_seconds FDB aging time from SWITCH_TABLE + # TYPE sonic_switch_fdb_aging_time_seconds gauge + # HELP sonic_switch_ordered_ecmp Whether ordered ECMP is enabled (1=yes, 0=no) + # TYPE sonic_switch_ordered_ecmp gauge + ` + expected := ` + sonic_switch_fdb_aging_time_seconds{switch="switch"} 600 + sonic_switch_ordered_ecmp{switch="switch"} 1 + ` + if err := testutil.CollectAndCompare(switchCollector, strings.NewReader(metadata+expected), "sonic_switch_fdb_aging_time_seconds", "sonic_switch_ordered_ecmp"); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } +} + +func TestThermalCollector(t *testing.T) { + promslogConfig := &promslog.Config{} + logger := promslog.New(promslogConfig) + + thermalCollector := NewThermalCollector(logger, NewMetricFilter(logger)) + + problems, err := testutil.CollectAndLint(thermalCollector) + if err != nil { + t.Error("metric lint completed with errors") + } + + for _, problem := range problems { + t.Errorf("metric %v has a problem: %v", problem.Metric, problem.Text) + } + + metadata := ` + # HELP sonic_thermal_asic_temperature_celsius ASIC per-sensor temperature in celsius + # TYPE sonic_thermal_asic_temperature_celsius gauge + # HELP sonic_thermal_sfp_maximum_temperature_celsius Maximum transceiver temperature across all optics + # TYPE sonic_thermal_sfp_maximum_temperature_celsius gauge + ` + expected := ` + sonic_thermal_asic_temperature_celsius{sensor="temperature_0"} 45 + sonic_thermal_asic_temperature_celsius{sensor="temperature_1"} 47 + sonic_thermal_sfp_maximum_temperature_celsius 31 + ` + if err := testutil.CollectAndCompare(thermalCollector, strings.NewReader(metadata+expected), "sonic_thermal_asic_temperature_celsius", "sonic_thermal_sfp_maximum_temperature_celsius"); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } +} + +func TestTransceiverCollector(t *testing.T) { + promslogConfig := &promslog.Config{} + logger := promslog.New(promslogConfig) + + transceiverCollector := NewTransceiverCollector(logger, NewMetricFilter(logger)) + + problems, err := testutil.CollectAndLint(transceiverCollector) + if err != nil { + t.Error("metric lint completed with errors") + } + + for _, problem := range problems { + t.Errorf("metric %v has a problem: %v", problem.Metric, problem.Text) + } + + infoMetadata := ` + # HELP sonic_transceiver_module_info Transceiver module state metadata, value is always 1 + # TYPE sonic_transceiver_module_info gauge + ` + infoExpected := ` + sonic_transceiver_module_info{device="Ethernet0",module_fault_cause="No Fault detected",module_state="ModuleReady"} 1 + ` + if err := testutil.CollectAndCompare(transceiverCollector, strings.NewReader(infoMetadata+infoExpected), "sonic_transceiver_module_info"); err != nil { + t.Errorf("unexpected collecting result:\n%s", err) + } + + statusFamily := getMetricFamily(t, transceiverCollector, "sonic_transceiver_status_value") + if !metricWithLabelsExists(statusFamily, map[string]string{"device": "Ethernet0", "field": "rx1OutputStatusHostlane"}, 1) { + t.Fatalf("expected rx1OutputStatusHostlane status metric") + } + if !metricWithLabelsExists(statusFamily, map[string]string{"device": "Ethernet0", "field": "tx2disable"}, 1) { + t.Fatalf("expected tx2disable status metric") + } + + thresholdFamily := getMetricFamily(t, transceiverCollector, "sonic_transceiver_dom_threshold_value") + if !metricWithLabelsExists(thresholdFamily, map[string]string{"device": "Ethernet0", "threshold": "temphighalarm"}, 80) { + t.Fatalf("expected temphighalarm threshold metric") + } + if !metricWithLabelsExists(thresholdFamily, map[string]string{"device": "Ethernet0", "threshold": "txpowerlowwarning"}, -4.3) { + t.Fatalf("expected txpowerlowwarning threshold metric") + } +} + +func TestPlatformHealthCollector(t *testing.T) { + promslogConfig := &promslog.Config{} + logger := promslog.New(promslogConfig) + + platformCollector := NewPlatformHealthCollector(logger, NewMetricFilter(logger)) + + problems, err := testutil.CollectAndLint(platformCollector) + if err != nil { + t.Error("metric lint completed with errors") + } + + for _, problem := range problems { + t.Errorf("metric %v has a problem: %v", problem.Metric, problem.Text) + } + + processInfoFamily := getMetricFamily(t, platformCollector, "sonic_platform_process_info") + if !metricWithLabelsExists(processInfoFamily, map[string]string{"pid": "1", "process": "init", "uid": "0"}, 1) { + t.Fatalf("expected init process info metric") + } + if !metricWithLabelsExists(processInfoFamily, map[string]string{"pid": "42", "process": "orchagent", "uid": "1000"}, 1) { + t.Fatalf("expected orchagent process info metric") + } + + processCPUFamily := getMetricFamily(t, platformCollector, "sonic_platform_process_cpu_percent") + if !metricWithLabelsExists(processCPUFamily, map[string]string{"pid": "42", "process": "orchagent"}, 1.5) { + t.Fatalf("expected orchagent cpu metric") + } + + storageFamily := getMetricFamily(t, platformCollector, "sonic_platform_storage_health_percent") + if !metricWithLabelsExists(storageFamily, map[string]string{"device": "sda"}, 100) { + t.Fatalf("expected storage health metric for sda") + } + + systemHealthFamily := getMetricFamily(t, platformCollector, "sonic_platform_system_health_info") + if !metricWithLabelsExists(systemHealthFamily, map[string]string{"summary": "summary=OK"}, 1) { + t.Fatalf("expected system health summary metric") + } } func TestVlanCollector(t *testing.T) { diff --git a/internal/collector/lldp_collector.go b/internal/collector/lldp_collector.go index 512e146..d0908bb 100644 --- a/internal/collector/lldp_collector.go +++ b/internal/collector/lldp_collector.go @@ -25,6 +25,7 @@ type lldpCollectorConfig struct { } type lldpCollector struct { + lldpLocalChassisInfo *prometheus.Desc lldpNeighborInfo *prometheus.Desc lldpNeighbors *prometheus.Desc scrapeDuration *prometheus.Desc @@ -52,6 +53,8 @@ func NewLldpCollector(logger *slog.Logger, metricFilter MetricFilter) *lldpColle ) collector := &lldpCollector{ + lldpLocalChassisInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "local_chassis_info"), + "Non-numeric data about local LLDP chassis, value is always 1", []string{"system_name", "chassis_id"}, nil), lldpNeighborInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "neighbor_info"), "Non-numeric data about LLDP neighbor, value is always 1", []string{"local_interface", "local_role", "remote_system_name", "remote_port_id", "remote_port_desc", "remote_port_id_subtype", "remote_port_display", "remote_chassis_id", "remote_mgmt_ip"}, nil), lldpNeighbors: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "neighbors"), @@ -86,6 +89,7 @@ func (collector *lldpCollector) IsEnabled() bool { func (collector *lldpCollector) Describe(ch chan<- *prometheus.Desc) { ch <- collector.lldpNeighborInfo + ch <- collector.lldpLocalChassisInfo ch <- collector.lldpNeighbors ch <- collector.scrapeDuration ch <- collector.scrapeCollectorSuccess @@ -182,10 +186,24 @@ func (collector *lldpCollector) scrapeMetrics(ctx context.Context) ([]prometheus sort.Strings(lldpKeys) - metrics := make([]prometheus.Metric, 0, len(lldpKeys)) + metrics := make([]prometheus.Metric, 0, len(lldpKeys)+1) neighbors := 0 skippedEntries := 0 + localChassisData, err := redisClient.HgetAllFromDb(ctx, "APPL_DB", "LLDP_LOC_CHASSIS") + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read LLDP local chassis entry: %w", err) + } + if len(localChassisData) > 0 && collector.metricFilter.Enabled("sonic_lldp_local_chassis_info") { + metrics = append(metrics, prometheus.MustNewConstMetric( + collector.lldpLocalChassisInfo, + prometheus.GaugeValue, + 1, + localChassisData["lldp_loc_sys_name"], + localChassisData["lldp_loc_chassis_id"], + )) + } + for _, lldpKey := range lldpKeys { if neighbors >= collector.config.maxNeighbors { skippedEntries++ diff --git a/internal/collector/platform_health_collector.go b/internal/collector/platform_health_collector.go new file mode 100644 index 0000000..93ed58a --- /dev/null +++ b/internal/collector/platform_health_collector.go @@ -0,0 +1,349 @@ +package collector + +import ( + "context" + "fmt" + "log/slog" + "sort" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/vinted/sonic-exporter/pkg/redis" +) + +type platformHealthCollectorConfig struct { + enabled bool + refreshInterval time.Duration + timeout time.Duration + maxProcesses int + maxStorageDevices int + redisScanCount int64 +} + +type platformHealthCollector struct { + processInfo *prometheus.Desc + processCPUPercent *prometheus.Desc + processMemoryPercent *prometheus.Desc + processRunning *prometheus.Desc + storageInfo *prometheus.Desc + storageHealthPercent *prometheus.Desc + storageTemperature *prometheus.Desc + storageReservedBlocks *prometheus.Desc + storageFsioReads *prometheus.Desc + storageFsioWrites *prometheus.Desc + storageDiskReads *prometheus.Desc + storageDiskWrites *prometheus.Desc + systemHealthInfo *prometheus.Desc + entriesSkipped *prometheus.Desc + entriesTruncated *prometheus.Desc + scrapeDuration *prometheus.Desc + scrapeCollectorSuccess *prometheus.Desc + cacheAge *prometheus.Desc + + logger *slog.Logger + metricFilter MetricFilter + config platformHealthCollectorConfig + + mu sync.RWMutex + cachedMetrics []prometheus.Metric + lastSuccess float64 + lastScrapeDuration float64 + lastSkippedEntries float64 + lastTruncated float64 + lastRefreshTime time.Time +} + +func NewPlatformHealthCollector(logger *slog.Logger, metricFilter MetricFilter) *platformHealthCollector { + const ( + namespace = "sonic" + subsystem = "platform" + ) + + collector := &platformHealthCollector{ + processInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "process_info"), + "Process metadata from PROCESS_STATS, value is always 1", []string{"pid", "process", "uid"}, nil), + processCPUPercent: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "process_cpu_percent"), + "Process CPU percent from PROCESS_STATS", []string{"pid", "process"}, nil), + processMemoryPercent: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "process_memory_percent"), + "Process memory percent from PROCESS_STATS", []string{"pid", "process"}, nil), + processRunning: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "process_running"), + "Whether process entry is present in PROCESS_STATS", []string{"pid", "process"}, nil), + storageInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_info"), + "Storage metadata from STORAGE_INFO, value is always 1", []string{"device", "model", "serial", "firmware"}, nil), + storageHealthPercent: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_health_percent"), + "Storage health percent from STORAGE_INFO", []string{"device"}, nil), + storageTemperature: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_temperature_celsius"), + "Storage temperature from STORAGE_INFO", []string{"device"}, nil), + storageReservedBlocks: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_reserved_blocks"), + "Storage reserved blocks from STORAGE_INFO", []string{"device"}, nil), + storageFsioReads: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_total_fsio_reads_total"), + "Storage total fsio reads from STORAGE_INFO", []string{"device"}, nil), + storageFsioWrites: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_total_fsio_writes_total"), + "Storage total fsio writes from STORAGE_INFO", []string{"device"}, nil), + storageDiskReads: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_disk_io_reads_total"), + "Storage disk I/O reads from STORAGE_INFO", []string{"device"}, nil), + storageDiskWrites: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "storage_disk_io_writes_total"), + "Storage disk I/O writes from STORAGE_INFO", []string{"device"}, nil), + systemHealthInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "system_health_info"), + "System health summary from SYSTEM_HEALTH_INFO, value is always 1", []string{"summary"}, nil), + entriesSkipped: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_skipped"), + "Number of platform health entries skipped during latest refresh", nil, nil), + entriesTruncated: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_truncated"), + "Whether platform health collection hit entry limits (1=yes, 0=no)", nil, nil), + scrapeDuration: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "scrape_duration_seconds"), + "Time it took for exporter to refresh platform health metrics", nil, nil), + scrapeCollectorSuccess: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "collector_success"), + "Whether platform health collector succeeded", nil, nil), + cacheAge: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "cache_age_seconds"), + "Age of latest platform health cache refresh", nil, nil), + logger: logger, + metricFilter: metricFilter, + config: platformHealthCollectorConfig{ + enabled: parseBoolEnv(logger, "PLATFORM_HEALTH_ENABLED", false), + refreshInterval: parseDurationEnv(logger, "PLATFORM_HEALTH_REFRESH_INTERVAL", 60*time.Second), + timeout: parseDurationEnv(logger, "PLATFORM_HEALTH_TIMEOUT", 2*time.Second), + maxProcesses: parseIntEnv(logger, "PLATFORM_HEALTH_MAX_PROCESSES", 512), + maxStorageDevices: parseIntEnv(logger, "PLATFORM_HEALTH_MAX_STORAGE_DEVICES", 128), + redisScanCount: 256, + }, + } + + if !collector.config.enabled { + collector.logger.Info("Platform health collector is disabled") + return collector + } + + collector.refreshMetrics() + go collector.refreshLoop() + + return collector +} + +func (collector *platformHealthCollector) IsEnabled() bool { return collector.config.enabled } + +func (collector *platformHealthCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- collector.processInfo + ch <- collector.processCPUPercent + ch <- collector.processMemoryPercent + ch <- collector.processRunning + ch <- collector.storageInfo + ch <- collector.storageHealthPercent + ch <- collector.storageTemperature + ch <- collector.storageReservedBlocks + ch <- collector.storageFsioReads + ch <- collector.storageFsioWrites + ch <- collector.storageDiskReads + ch <- collector.storageDiskWrites + ch <- collector.systemHealthInfo + ch <- collector.entriesSkipped + ch <- collector.entriesTruncated + ch <- collector.scrapeDuration + ch <- collector.scrapeCollectorSuccess + ch <- collector.cacheAge +} + +func (collector *platformHealthCollector) Collect(ch chan<- prometheus.Metric) { + if !collector.config.enabled { + return + } + + collector.mu.RLock() + cachedMetrics := append([]prometheus.Metric{}, collector.cachedMetrics...) + lastScrapeDuration := collector.lastScrapeDuration + lastSuccess := collector.lastSuccess + lastSkippedEntries := collector.lastSkippedEntries + lastTruncated := collector.lastTruncated + lastRefreshTime := collector.lastRefreshTime + collector.mu.RUnlock() + + for _, metric := range cachedMetrics { + ch <- metric + } + + cacheAge := 0.0 + if !lastRefreshTime.IsZero() { + cacheAge = time.Since(lastRefreshTime).Seconds() + } + if collector.metricFilter.Enabled("sonic_platform_entries_skipped") { + ch <- prometheus.MustNewConstMetric(collector.entriesSkipped, prometheus.GaugeValue, lastSkippedEntries) + } + if collector.metricFilter.Enabled("sonic_platform_entries_truncated") { + ch <- prometheus.MustNewConstMetric(collector.entriesTruncated, prometheus.GaugeValue, lastTruncated) + } + if collector.metricFilter.Enabled("sonic_platform_scrape_duration_seconds") { + ch <- prometheus.MustNewConstMetric(collector.scrapeDuration, prometheus.GaugeValue, lastScrapeDuration) + } + if collector.metricFilter.Enabled("sonic_platform_collector_success") { + ch <- prometheus.MustNewConstMetric(collector.scrapeCollectorSuccess, prometheus.GaugeValue, lastSuccess) + } + if collector.metricFilter.Enabled("sonic_platform_cache_age_seconds") { + ch <- prometheus.MustNewConstMetric(collector.cacheAge, prometheus.GaugeValue, cacheAge) + } +} + +func (collector *platformHealthCollector) refreshLoop() { + ticker := time.NewTicker(collector.config.refreshInterval) + defer ticker.Stop() + for range ticker.C { + collector.refreshMetrics() + } +} + +func (collector *platformHealthCollector) refreshMetrics() { + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), collector.config.timeout) + defer cancel() + metrics, skippedEntries, truncated, err := collector.scrapeMetrics(ctx) + scrapeDuration := time.Since(start).Seconds() + + collector.mu.Lock() + defer collector.mu.Unlock() + collector.lastScrapeDuration = scrapeDuration + if err != nil { + collector.lastSuccess = 0 + collector.logger.Error("Error refreshing platform health metrics", "error", err) + return + } + collector.cachedMetrics = metrics + collector.lastSkippedEntries = float64(skippedEntries) + collector.lastTruncated = truncated + collector.lastSuccess = 1 + collector.lastRefreshTime = time.Now() +} + +func (collector *platformHealthCollector) scrapeMetrics(ctx context.Context) ([]prometheus.Metric, int, float64, error) { + redisClient, err := redis.NewClient() + if err != nil { + return nil, 0, 0, fmt.Errorf("redis client initialization failed: %w", err) + } + defer redisClient.Close() + + metrics := []prometheus.Metric{} + skippedEntries := 0 + truncated := 0.0 + + processKeys, err := redisClient.ScanKeysFromDb(ctx, "STATE_DB", "PROCESS_STATS|*", collector.config.redisScanCount) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to scan process keys: %w", err) + } + sort.Strings(processKeys) + processCount := 0 + for _, processKey := range processKeys { + if processKey == "PROCESS_STATS|LastUpdateTime" { + continue + } + if processCount >= collector.config.maxProcesses { + truncated = 1 + skippedEntries++ + continue + } + + pid, err := parseKeySuffix(processKey, "PROCESS_STATS|") + if err != nil { + skippedEntries++ + continue + } + + processData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", processKey) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read process entry %s: %w", processKey, err) + } + if len(processData) == 0 { + skippedEntries++ + continue + } + + processName := normalizeProcessName(processData["CMD"]) + uid := strings.TrimSpace(processData["UID"]) + if collector.metricFilter.Enabled("sonic_platform_process_info") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.processInfo, prometheus.GaugeValue, 1, pid, processName, uid)) + } + if collector.metricFilter.Enabled("sonic_platform_process_running") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.processRunning, prometheus.GaugeValue, 1, pid, processName)) + } + if value, ok := parseCounterLike(processData["CPU"]); ok && collector.metricFilter.Enabled("sonic_platform_process_cpu_percent") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.processCPUPercent, prometheus.GaugeValue, value, pid, processName)) + } + if value, ok := parseCounterLike(processData["MEM"]); ok && collector.metricFilter.Enabled("sonic_platform_process_memory_percent") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.processMemoryPercent, prometheus.GaugeValue, value, pid, processName)) + } + + processCount++ + } + + storageKeys, err := redisClient.ScanKeysFromDb(ctx, "STATE_DB", "STORAGE_INFO|*", collector.config.redisScanCount) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to scan storage keys: %w", err) + } + sort.Strings(storageKeys) + storageCount := 0 + for _, storageKey := range storageKeys { + device, err := parseKeySuffix(storageKey, "STORAGE_INFO|") + if err != nil || device == "FSSTATS_SYNC" { + continue + } + if storageCount >= collector.config.maxStorageDevices { + truncated = 1 + skippedEntries++ + continue + } + + storageData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", storageKey) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read storage entry %s: %w", storageKey, err) + } + if len(storageData) == 0 { + skippedEntries++ + continue + } + + if collector.metricFilter.Enabled("sonic_platform_storage_info") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageInfo, prometheus.GaugeValue, 1, device, storageData["device_model"], storageData["serial"], storageData["firmware"])) + } + if value, ok := parseCounterLike(storageData["health"]); ok && collector.metricFilter.Enabled("sonic_platform_storage_health_percent") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageHealthPercent, prometheus.GaugeValue, value, device)) + } + if value, ok := parseCounterLike(storageData["temperature"]); ok && collector.metricFilter.Enabled("sonic_platform_storage_temperature_celsius") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageTemperature, prometheus.GaugeValue, value, device)) + } + if value, ok := parseCounterLike(storageData["reserved_blocks"]); ok && collector.metricFilter.Enabled("sonic_platform_storage_reserved_blocks") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageReservedBlocks, prometheus.GaugeValue, value, device)) + } + if value, ok := parseCounterLike(storageData["total_fsio_reads"]); ok && collector.metricFilter.Enabled("sonic_platform_storage_total_fsio_reads_total") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageFsioReads, prometheus.CounterValue, value, device)) + } + if value, ok := parseCounterLike(storageData["total_fsio_writes"]); ok && collector.metricFilter.Enabled("sonic_platform_storage_total_fsio_writes_total") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageFsioWrites, prometheus.CounterValue, value, device)) + } + if value, ok := parseCounterLike(storageData["disk_io_reads"]); ok && collector.metricFilter.Enabled("sonic_platform_storage_disk_io_reads_total") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageDiskReads, prometheus.CounterValue, value, device)) + } + if value, ok := parseCounterLike(storageData["disk_io_writes"]); ok && collector.metricFilter.Enabled("sonic_platform_storage_disk_io_writes_total") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.storageDiskWrites, prometheus.CounterValue, value, device)) + } + + storageCount++ + } + + systemHealthData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "SYSTEM_HEALTH_INFO") + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read SYSTEM_HEALTH_INFO: %w", err) + } + if len(systemHealthData) > 0 { + fields := make([]string, 0, len(systemHealthData)) + for field := range systemHealthData { + fields = append(fields, field) + } + sort.Strings(fields) + for _, field := range fields { + if collector.metricFilter.Enabled("sonic_platform_system_health_info") { + summary := strings.TrimSpace(field + "=" + systemHealthData[field]) + metrics = append(metrics, prometheus.MustNewConstMetric(collector.systemHealthInfo, prometheus.GaugeValue, 1, summary)) + } + } + } + + return metrics, skippedEntries, truncated, nil +} diff --git a/internal/collector/redis_value_helpers.go b/internal/collector/redis_value_helpers.go new file mode 100644 index 0000000..d7a27ee --- /dev/null +++ b/internal/collector/redis_value_helpers.go @@ -0,0 +1,78 @@ +package collector + +import ( + "fmt" + "path/filepath" + "strconv" + "strings" + "time" +) + +func parseBoolish(value string) (float64, bool) { + switch strings.ToLower(strings.TrimSpace(value)) { + case "true", "up", "yes", "1", "moduleready", "datapathactivated", "ok": + return 1, true + case "false", "down", "no", "0", "moduleabsent", "modulefault": + return 0, true + default: + return 0, false + } +} + +func parseCounterLike(value string) (float64, bool) { + parsedValue, err := strconv.ParseFloat(strings.TrimSpace(value), 64) + if err != nil { + return 0, false + } + + return parsedValue, true +} + +func parseEventTime(value string) (float64, bool) { + trimmedValue := strings.TrimSpace(value) + if trimmedValue == "" || strings.EqualFold(trimmedValue, "never") { + return 0, false + } + + parsedTime, err := time.Parse("Mon Jan 02 15:04:05 2006", trimmedValue) + if err != nil { + return 0, false + } + + return float64(parsedTime.Unix()), true +} + +func normalizeProcessName(command string) string { + trimmedCommand := strings.TrimSpace(command) + if trimmedCommand == "" { + return "unknown" + } + + firstToken := strings.Fields(trimmedCommand)[0] + if firstToken == "" { + return "unknown" + } + + return filepath.Base(firstToken) +} + +func familyFromPrefix(value string) string { + if strings.Contains(value, ":") { + return "ipv6" + } + + return "ipv4" +} + +func parseKeySuffix(key, prefix string) (string, error) { + if !strings.HasPrefix(key, prefix) { + return "", fmt.Errorf("key %q does not start with %q", key, prefix) + } + + suffix := strings.TrimPrefix(key, prefix) + if suffix == "" { + return "", fmt.Errorf("key %q has empty suffix", key) + } + + return suffix, nil +} diff --git a/internal/collector/routing_collector.go b/internal/collector/routing_collector.go new file mode 100644 index 0000000..df21af8 --- /dev/null +++ b/internal/collector/routing_collector.go @@ -0,0 +1,286 @@ +package collector + +import ( + "context" + "fmt" + "log/slog" + "sort" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/vinted/sonic-exporter/pkg/redis" +) + +type routingCollectorConfig struct { + enabled bool + refreshInterval time.Duration + timeout time.Duration + maxNeighbors int + maxRoutes int + redisScanCount int64 +} + +type routingCollector struct { + neighborEntries *prometheus.Desc + routeEntries *prometheus.Desc + entriesSkipped *prometheus.Desc + entriesTruncated *prometheus.Desc + scrapeDuration *prometheus.Desc + scrapeCollectorSuccess *prometheus.Desc + cacheAge *prometheus.Desc + + logger *slog.Logger + metricFilter MetricFilter + config routingCollectorConfig + + mu sync.RWMutex + cachedMetrics []prometheus.Metric + lastSuccess float64 + lastScrapeDuration float64 + lastSkippedEntries float64 + lastTruncated float64 + lastRefreshTime time.Time +} + +func NewRoutingCollector(logger *slog.Logger, metricFilter MetricFilter) *routingCollector { + const ( + namespace = "sonic" + subsystem = "routing" + ) + + collector := &routingCollector{ + neighborEntries: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "neighbor_entries"), + "Number of neighbor entries by interface and address family", []string{"interface", "family"}, nil), + routeEntries: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "route_entries"), + "Number of route entries by address family and protocol", []string{"family", "protocol"}, nil), + entriesSkipped: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_skipped"), + "Number of routing entries skipped during latest refresh", nil, nil), + entriesTruncated: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_truncated"), + "Whether routing collection hit entry limits (1=yes, 0=no)", nil, nil), + scrapeDuration: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "scrape_duration_seconds"), + "Time it took for exporter to refresh routing metrics", nil, nil), + scrapeCollectorSuccess: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "collector_success"), + "Whether routing collector succeeded", nil, nil), + cacheAge: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "cache_age_seconds"), + "Age of latest routing cache refresh", nil, nil), + logger: logger, + metricFilter: metricFilter, + config: routingCollectorConfig{ + enabled: parseBoolEnv(logger, "ROUTING_ENABLED", false), + refreshInterval: parseDurationEnv(logger, "ROUTING_REFRESH_INTERVAL", 60*time.Second), + timeout: parseDurationEnv(logger, "ROUTING_TIMEOUT", 2*time.Second), + maxNeighbors: parseIntEnv(logger, "ROUTING_MAX_NEIGHBORS", 50000), + maxRoutes: parseIntEnv(logger, "ROUTING_MAX_ROUTES", 200000), + redisScanCount: 256, + }, + } + + if !collector.config.enabled { + collector.logger.Info("Routing collector is disabled") + return collector + } + + collector.refreshMetrics() + go collector.refreshLoop() + + return collector +} + +func (collector *routingCollector) IsEnabled() bool { + return collector.config.enabled +} + +func (collector *routingCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- collector.neighborEntries + ch <- collector.routeEntries + ch <- collector.entriesSkipped + ch <- collector.entriesTruncated + ch <- collector.scrapeDuration + ch <- collector.scrapeCollectorSuccess + ch <- collector.cacheAge +} + +func (collector *routingCollector) Collect(ch chan<- prometheus.Metric) { + if !collector.config.enabled { + return + } + + collector.mu.RLock() + cachedMetrics := append([]prometheus.Metric{}, collector.cachedMetrics...) + lastScrapeDuration := collector.lastScrapeDuration + lastSuccess := collector.lastSuccess + lastSkippedEntries := collector.lastSkippedEntries + lastTruncated := collector.lastTruncated + lastRefreshTime := collector.lastRefreshTime + collector.mu.RUnlock() + + for _, metric := range cachedMetrics { + ch <- metric + } + + cacheAge := 0.0 + if !lastRefreshTime.IsZero() { + cacheAge = time.Since(lastRefreshTime).Seconds() + } + + if collector.metricFilter.Enabled("sonic_routing_entries_skipped") { + ch <- prometheus.MustNewConstMetric(collector.entriesSkipped, prometheus.GaugeValue, lastSkippedEntries) + } + if collector.metricFilter.Enabled("sonic_routing_entries_truncated") { + ch <- prometheus.MustNewConstMetric(collector.entriesTruncated, prometheus.GaugeValue, lastTruncated) + } + if collector.metricFilter.Enabled("sonic_routing_scrape_duration_seconds") { + ch <- prometheus.MustNewConstMetric(collector.scrapeDuration, prometheus.GaugeValue, lastScrapeDuration) + } + if collector.metricFilter.Enabled("sonic_routing_collector_success") { + ch <- prometheus.MustNewConstMetric(collector.scrapeCollectorSuccess, prometheus.GaugeValue, lastSuccess) + } + if collector.metricFilter.Enabled("sonic_routing_cache_age_seconds") { + ch <- prometheus.MustNewConstMetric(collector.cacheAge, prometheus.GaugeValue, cacheAge) + } +} + +func (collector *routingCollector) refreshLoop() { + ticker := time.NewTicker(collector.config.refreshInterval) + defer ticker.Stop() + + for range ticker.C { + collector.refreshMetrics() + } +} + +func (collector *routingCollector) refreshMetrics() { + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), collector.config.timeout) + defer cancel() + + metrics, skippedEntries, truncated, err := collector.scrapeMetrics(ctx) + scrapeDuration := time.Since(start).Seconds() + + collector.mu.Lock() + defer collector.mu.Unlock() + + collector.lastScrapeDuration = scrapeDuration + + if err != nil { + collector.lastSuccess = 0 + collector.logger.Error("Error refreshing routing metrics", "error", err) + return + } + + collector.cachedMetrics = metrics + collector.lastSkippedEntries = float64(skippedEntries) + collector.lastTruncated = truncated + collector.lastSuccess = 1 + collector.lastRefreshTime = time.Now() +} + +func (collector *routingCollector) scrapeMetrics(ctx context.Context) ([]prometheus.Metric, int, float64, error) { + redisClient, err := redis.NewClient() + if err != nil { + return nil, 0, 0, fmt.Errorf("redis client initialization failed: %w", err) + } + defer redisClient.Close() + + neighborKeys, err := redisClient.ScanKeysFromDb(ctx, "APPL_DB", "NEIGH_TABLE:*", collector.config.redisScanCount) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to scan neighbor keys: %w", err) + } + + routeKeys, err := redisClient.ScanKeysFromDb(ctx, "APPL_DB", "ROUTE_TABLE:*", collector.config.redisScanCount) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to scan route keys: %w", err) + } + + sort.Strings(neighborKeys) + sort.Strings(routeKeys) + + neighborCounts := map[string]float64{} + routeCounts := map[string]float64{} + skippedEntries := 0 + truncated := 0.0 + + for index, neighborKey := range neighborKeys { + if index >= collector.config.maxNeighbors { + truncated = 1 + skippedEntries += len(neighborKeys) - index + break + } + + suffix, err := parseKeySuffix(neighborKey, "NEIGH_TABLE:") + if err != nil { + skippedEntries++ + continue + } + parts := strings.SplitN(suffix, ":", 2) + if len(parts) != 2 { + skippedEntries++ + continue + } + + neighborData, err := redisClient.HgetAllFromDb(ctx, "APPL_DB", neighborKey) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read neighbor entry %s: %w", neighborKey, err) + } + if len(neighborData) == 0 { + skippedEntries++ + continue + } + + family := strings.ToLower(strings.TrimSpace(neighborData["family"])) + if family == "" { + family = familyFromPrefix(parts[1]) + } + + neighborCounts[parts[0]+"|"+family]++ + } + + for index, routeKey := range routeKeys { + if index >= collector.config.maxRoutes { + truncated = 1 + skippedEntries += len(routeKeys) - index + break + } + + prefix, err := parseKeySuffix(routeKey, "ROUTE_TABLE:") + if err != nil { + skippedEntries++ + continue + } + + routeData, err := redisClient.HgetAllFromDb(ctx, "APPL_DB", routeKey) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read route entry %s: %w", routeKey, err) + } + if len(routeData) == 0 { + skippedEntries++ + continue + } + + protocol := strings.ToLower(strings.TrimSpace(routeData["protocol"])) + if protocol == "" { + protocol = "unknown" + } + + routeCounts[familyFromPrefix(prefix)+"|"+protocol]++ + } + + metrics := make([]prometheus.Metric, 0, len(neighborCounts)+len(routeCounts)) + for _, key := range sortedMapKeys(neighborCounts) { + parts := strings.Split(key, "|") + if collector.metricFilter.Enabled("sonic_routing_neighbor_entries") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.neighborEntries, prometheus.GaugeValue, neighborCounts[key], parts[0], parts[1])) + } + } + + for _, key := range sortedMapKeys(routeCounts) { + parts := strings.Split(key, "|") + if collector.metricFilter.Enabled("sonic_routing_route_entries") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.routeEntries, prometheus.GaugeValue, routeCounts[key], parts[0], parts[1])) + } + } + + return metrics, skippedEntries, truncated, nil +} diff --git a/internal/collector/switch_collector.go b/internal/collector/switch_collector.go new file mode 100644 index 0000000..5c2f884 --- /dev/null +++ b/internal/collector/switch_collector.go @@ -0,0 +1,252 @@ +package collector + +import ( + "context" + "fmt" + "log/slog" + "sort" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/vinted/sonic-exporter/pkg/redis" +) + +type switchCollectorConfig struct { + enabled bool + refreshInterval time.Duration + timeout time.Duration + maxEntries int + redisScanCount int64 +} + +type switchCollector struct { + switchInfo *prometheus.Desc + switchFDBAgingSeconds *prometheus.Desc + switchEcmpHashSeed *prometheus.Desc + switchEcmpHashOffset *prometheus.Desc + switchLagHashSeed *prometheus.Desc + switchLagHashOffset *prometheus.Desc + switchOrderedEcmp *prometheus.Desc + entriesSkipped *prometheus.Desc + entriesTruncated *prometheus.Desc + scrapeDuration *prometheus.Desc + scrapeCollectorSuccess *prometheus.Desc + cacheAge *prometheus.Desc + + logger *slog.Logger + metricFilter MetricFilter + config switchCollectorConfig + + mu sync.RWMutex + cachedMetrics []prometheus.Metric + lastSuccess float64 + lastScrapeDuration float64 + lastSkippedEntries float64 + lastTruncated float64 + lastRefreshTime time.Time +} + +func NewSwitchCollector(logger *slog.Logger, metricFilter MetricFilter) *switchCollector { + const ( + namespace = "sonic" + subsystem = "switch" + ) + + collector := &switchCollector{ + switchInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "info"), + "Switch table metadata, value is always 1", []string{"switch"}, nil), + switchFDBAgingSeconds: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "fdb_aging_time_seconds"), + "FDB aging time from SWITCH_TABLE", []string{"switch"}, nil), + switchEcmpHashSeed: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "ecmp_hash_seed"), + "ECMP hash seed from SWITCH_TABLE", []string{"switch"}, nil), + switchEcmpHashOffset: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "ecmp_hash_offset"), + "ECMP hash offset from SWITCH_TABLE", []string{"switch"}, nil), + switchLagHashSeed: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "lag_hash_seed"), + "LAG hash seed from SWITCH_TABLE", []string{"switch"}, nil), + switchLagHashOffset: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "lag_hash_offset"), + "LAG hash offset from SWITCH_TABLE", []string{"switch"}, nil), + switchOrderedEcmp: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "ordered_ecmp"), + "Whether ordered ECMP is enabled (1=yes, 0=no)", []string{"switch"}, nil), + entriesSkipped: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_skipped"), + "Number of switch entries skipped during latest refresh", nil, nil), + entriesTruncated: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_truncated"), + "Whether switch collection hit entry limits (1=yes, 0=no)", nil, nil), + scrapeDuration: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "scrape_duration_seconds"), + "Time it took for exporter to refresh switch metrics", nil, nil), + scrapeCollectorSuccess: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "collector_success"), + "Whether switch collector succeeded", nil, nil), + cacheAge: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "cache_age_seconds"), + "Age of latest switch cache refresh", nil, nil), + logger: logger, + metricFilter: metricFilter, + config: switchCollectorConfig{ + enabled: parseBoolEnv(logger, "SWITCH_ENABLED", true), + refreshInterval: parseDurationEnv(logger, "SWITCH_REFRESH_INTERVAL", 60*time.Second), + timeout: parseDurationEnv(logger, "SWITCH_TIMEOUT", 2*time.Second), + maxEntries: parseIntEnv(logger, "SWITCH_MAX_ENTRIES", 16), + redisScanCount: 32, + }, + } + + if !collector.config.enabled { + collector.logger.Info("Switch collector is disabled") + return collector + } + + collector.refreshMetrics() + go collector.refreshLoop() + + return collector +} + +func (collector *switchCollector) IsEnabled() bool { return collector.config.enabled } + +func (collector *switchCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- collector.switchInfo + ch <- collector.switchFDBAgingSeconds + ch <- collector.switchEcmpHashSeed + ch <- collector.switchEcmpHashOffset + ch <- collector.switchLagHashSeed + ch <- collector.switchLagHashOffset + ch <- collector.switchOrderedEcmp + ch <- collector.entriesSkipped + ch <- collector.entriesTruncated + ch <- collector.scrapeDuration + ch <- collector.scrapeCollectorSuccess + ch <- collector.cacheAge +} + +func (collector *switchCollector) Collect(ch chan<- prometheus.Metric) { + if !collector.config.enabled { + return + } + + collector.mu.RLock() + cachedMetrics := append([]prometheus.Metric{}, collector.cachedMetrics...) + lastScrapeDuration := collector.lastScrapeDuration + lastSuccess := collector.lastSuccess + lastSkippedEntries := collector.lastSkippedEntries + lastTruncated := collector.lastTruncated + lastRefreshTime := collector.lastRefreshTime + collector.mu.RUnlock() + + for _, metric := range cachedMetrics { + ch <- metric + } + + cacheAge := 0.0 + if !lastRefreshTime.IsZero() { + cacheAge = time.Since(lastRefreshTime).Seconds() + } + if collector.metricFilter.Enabled("sonic_switch_entries_skipped") { + ch <- prometheus.MustNewConstMetric(collector.entriesSkipped, prometheus.GaugeValue, lastSkippedEntries) + } + if collector.metricFilter.Enabled("sonic_switch_entries_truncated") { + ch <- prometheus.MustNewConstMetric(collector.entriesTruncated, prometheus.GaugeValue, lastTruncated) + } + if collector.metricFilter.Enabled("sonic_switch_scrape_duration_seconds") { + ch <- prometheus.MustNewConstMetric(collector.scrapeDuration, prometheus.GaugeValue, lastScrapeDuration) + } + if collector.metricFilter.Enabled("sonic_switch_collector_success") { + ch <- prometheus.MustNewConstMetric(collector.scrapeCollectorSuccess, prometheus.GaugeValue, lastSuccess) + } + if collector.metricFilter.Enabled("sonic_switch_cache_age_seconds") { + ch <- prometheus.MustNewConstMetric(collector.cacheAge, prometheus.GaugeValue, cacheAge) + } +} + +func (collector *switchCollector) refreshLoop() { + ticker := time.NewTicker(collector.config.refreshInterval) + defer ticker.Stop() + for range ticker.C { + collector.refreshMetrics() + } +} + +func (collector *switchCollector) refreshMetrics() { + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), collector.config.timeout) + defer cancel() + metrics, skippedEntries, truncated, err := collector.scrapeMetrics(ctx) + scrapeDuration := time.Since(start).Seconds() + + collector.mu.Lock() + defer collector.mu.Unlock() + collector.lastScrapeDuration = scrapeDuration + if err != nil { + collector.lastSuccess = 0 + collector.logger.Error("Error refreshing switch metrics", "error", err) + return + } + collector.cachedMetrics = metrics + collector.lastSkippedEntries = float64(skippedEntries) + collector.lastTruncated = truncated + collector.lastSuccess = 1 + collector.lastRefreshTime = time.Now() +} + +func (collector *switchCollector) scrapeMetrics(ctx context.Context) ([]prometheus.Metric, int, float64, error) { + redisClient, err := redis.NewClient() + if err != nil { + return nil, 0, 0, fmt.Errorf("redis client initialization failed: %w", err) + } + defer redisClient.Close() + + switchKeys, err := redisClient.ScanKeysFromDb(ctx, "APPL_DB", "SWITCH_TABLE:*", collector.config.redisScanCount) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to scan switch keys: %w", err) + } + sort.Strings(switchKeys) + + metrics := make([]prometheus.Metric, 0, len(switchKeys)*7) + skippedEntries := 0 + truncated := 0.0 + + for index, switchKey := range switchKeys { + if index >= collector.config.maxEntries { + truncated = 1 + skippedEntries += len(switchKeys) - index + break + } + + switchName, err := parseKeySuffix(switchKey, "SWITCH_TABLE:") + if err != nil { + skippedEntries++ + continue + } + + switchData, err := redisClient.HgetAllFromDb(ctx, "APPL_DB", switchKey) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read switch entry %s: %w", switchKey, err) + } + if len(switchData) == 0 { + skippedEntries++ + continue + } + + if collector.metricFilter.Enabled("sonic_switch_info") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.switchInfo, prometheus.GaugeValue, 1, switchName)) + } + if value, ok := parseCounterLike(switchData["fdb_aging_time"]); ok && collector.metricFilter.Enabled("sonic_switch_fdb_aging_time_seconds") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.switchFDBAgingSeconds, prometheus.GaugeValue, value, switchName)) + } + if value, ok := parseCounterLike(switchData["ecmp_hash_seed"]); ok && collector.metricFilter.Enabled("sonic_switch_ecmp_hash_seed") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.switchEcmpHashSeed, prometheus.GaugeValue, value, switchName)) + } + if value, ok := parseCounterLike(switchData["ecmp_hash_offset"]); ok && collector.metricFilter.Enabled("sonic_switch_ecmp_hash_offset") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.switchEcmpHashOffset, prometheus.GaugeValue, value, switchName)) + } + if value, ok := parseCounterLike(switchData["lag_hash_seed"]); ok && collector.metricFilter.Enabled("sonic_switch_lag_hash_seed") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.switchLagHashSeed, prometheus.GaugeValue, value, switchName)) + } + if value, ok := parseCounterLike(switchData["lag_hash_offset"]); ok && collector.metricFilter.Enabled("sonic_switch_lag_hash_offset") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.switchLagHashOffset, prometheus.GaugeValue, value, switchName)) + } + if value, ok := parseBoolish(switchData["ordered_ecmp"]); ok && collector.metricFilter.Enabled("sonic_switch_ordered_ecmp") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.switchOrderedEcmp, prometheus.GaugeValue, value, switchName)) + } + } + + return metrics, skippedEntries, truncated, nil +} diff --git a/internal/collector/thermal_collector.go b/internal/collector/thermal_collector.go new file mode 100644 index 0000000..9270484 --- /dev/null +++ b/internal/collector/thermal_collector.go @@ -0,0 +1,221 @@ +package collector + +import ( + "context" + "fmt" + "log/slog" + "sort" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/vinted/sonic-exporter/pkg/redis" +) + +type thermalCollectorConfig struct { + enabled bool + refreshInterval time.Duration + timeout time.Duration + redisScanCount int64 +} + +type thermalCollector struct { + asicTemperature *prometheus.Desc + asicAverageTemperature *prometheus.Desc + asicMaximumTemperature *prometheus.Desc + sfpMaximumTemperature *prometheus.Desc + entriesSkipped *prometheus.Desc + scrapeDuration *prometheus.Desc + scrapeCollectorSuccess *prometheus.Desc + cacheAge *prometheus.Desc + + logger *slog.Logger + metricFilter MetricFilter + config thermalCollectorConfig + + mu sync.RWMutex + cachedMetrics []prometheus.Metric + lastSuccess float64 + lastScrapeDuration float64 + lastSkippedEntries float64 + lastRefreshTime time.Time +} + +func NewThermalCollector(logger *slog.Logger, metricFilter MetricFilter) *thermalCollector { + const ( + namespace = "sonic" + subsystem = "thermal" + ) + + collector := &thermalCollector{ + asicTemperature: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "asic_temperature_celsius"), + "ASIC per-sensor temperature in celsius", []string{"sensor"}, nil), + asicAverageTemperature: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "asic_average_temperature_celsius"), + "ASIC average temperature in celsius", nil, nil), + asicMaximumTemperature: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "asic_maximum_temperature_celsius"), + "ASIC maximum temperature in celsius", nil, nil), + sfpMaximumTemperature: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "sfp_maximum_temperature_celsius"), + "Maximum transceiver temperature across all optics", nil, nil), + entriesSkipped: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_skipped"), + "Number of thermal entries skipped during latest refresh", nil, nil), + scrapeDuration: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "scrape_duration_seconds"), + "Time it took for exporter to refresh thermal metrics", nil, nil), + scrapeCollectorSuccess: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "collector_success"), + "Whether thermal collector succeeded", nil, nil), + cacheAge: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "cache_age_seconds"), + "Age of latest thermal cache refresh", nil, nil), + logger: logger, + metricFilter: metricFilter, + config: thermalCollectorConfig{ + enabled: parseBoolEnv(logger, "THERMAL_ENABLED", true), + refreshInterval: parseDurationEnv(logger, "THERMAL_REFRESH_INTERVAL", 60*time.Second), + timeout: parseDurationEnv(logger, "THERMAL_TIMEOUT", 2*time.Second), + redisScanCount: 32, + }, + } + + if !collector.config.enabled { + collector.logger.Info("Thermal collector is disabled") + return collector + } + + collector.refreshMetrics() + go collector.refreshLoop() + + return collector +} + +func (collector *thermalCollector) IsEnabled() bool { return collector.config.enabled } + +func (collector *thermalCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- collector.asicTemperature + ch <- collector.asicAverageTemperature + ch <- collector.asicMaximumTemperature + ch <- collector.sfpMaximumTemperature + ch <- collector.entriesSkipped + ch <- collector.scrapeDuration + ch <- collector.scrapeCollectorSuccess + ch <- collector.cacheAge +} + +func (collector *thermalCollector) Collect(ch chan<- prometheus.Metric) { + if !collector.config.enabled { + return + } + + collector.mu.RLock() + cachedMetrics := append([]prometheus.Metric{}, collector.cachedMetrics...) + lastScrapeDuration := collector.lastScrapeDuration + lastSuccess := collector.lastSuccess + lastSkippedEntries := collector.lastSkippedEntries + lastRefreshTime := collector.lastRefreshTime + collector.mu.RUnlock() + + for _, metric := range cachedMetrics { + ch <- metric + } + + cacheAge := 0.0 + if !lastRefreshTime.IsZero() { + cacheAge = time.Since(lastRefreshTime).Seconds() + } + if collector.metricFilter.Enabled("sonic_thermal_entries_skipped") { + ch <- prometheus.MustNewConstMetric(collector.entriesSkipped, prometheus.GaugeValue, lastSkippedEntries) + } + if collector.metricFilter.Enabled("sonic_thermal_scrape_duration_seconds") { + ch <- prometheus.MustNewConstMetric(collector.scrapeDuration, prometheus.GaugeValue, lastScrapeDuration) + } + if collector.metricFilter.Enabled("sonic_thermal_collector_success") { + ch <- prometheus.MustNewConstMetric(collector.scrapeCollectorSuccess, prometheus.GaugeValue, lastSuccess) + } + if collector.metricFilter.Enabled("sonic_thermal_cache_age_seconds") { + ch <- prometheus.MustNewConstMetric(collector.cacheAge, prometheus.GaugeValue, cacheAge) + } +} + +func (collector *thermalCollector) refreshLoop() { + ticker := time.NewTicker(collector.config.refreshInterval) + defer ticker.Stop() + for range ticker.C { + collector.refreshMetrics() + } +} + +func (collector *thermalCollector) refreshMetrics() { + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), collector.config.timeout) + defer cancel() + metrics, skippedEntries, err := collector.scrapeMetrics(ctx) + scrapeDuration := time.Since(start).Seconds() + + collector.mu.Lock() + defer collector.mu.Unlock() + collector.lastScrapeDuration = scrapeDuration + if err != nil { + collector.lastSuccess = 0 + collector.logger.Error("Error refreshing thermal metrics", "error", err) + return + } + collector.cachedMetrics = metrics + collector.lastSkippedEntries = float64(skippedEntries) + collector.lastSuccess = 1 + collector.lastRefreshTime = time.Now() +} + +func (collector *thermalCollector) scrapeMetrics(ctx context.Context) ([]prometheus.Metric, int, error) { + redisClient, err := redis.NewClient() + if err != nil { + return nil, 0, fmt.Errorf("redis client initialization failed: %w", err) + } + defer redisClient.Close() + + metrics := []prometheus.Metric{} + skippedEntries := 0 + + asicTemperatureData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "ASIC_TEMPERATURE_INFO") + if err != nil { + return nil, 0, fmt.Errorf("failed to read ASIC_TEMPERATURE_INFO: %w", err) + } + + asicFields := make([]string, 0, len(asicTemperatureData)) + for field := range asicTemperatureData { + asicFields = append(asicFields, field) + } + sort.Strings(asicFields) + for _, field := range asicFields { + value := asicTemperatureData[field] + parsedValue, ok := parseCounterLike(value) + if !ok { + skippedEntries++ + continue + } + + switch { + case strings.HasPrefix(field, "temperature_"): + if collector.metricFilter.Enabled("sonic_thermal_asic_temperature_celsius") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.asicTemperature, prometheus.GaugeValue, parsedValue, field)) + } + case field == "average_temperature": + if collector.metricFilter.Enabled("sonic_thermal_asic_average_temperature_celsius") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.asicAverageTemperature, prometheus.GaugeValue, parsedValue)) + } + case field == "maximum_temperature": + if collector.metricFilter.Enabled("sonic_thermal_asic_maximum_temperature_celsius") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.asicMaximumTemperature, prometheus.GaugeValue, parsedValue)) + } + } + } + + sfpMaxData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TEMPERATURE_SFP_MAX") + if err != nil { + return nil, 0, fmt.Errorf("failed to read TEMPERATURE_SFP_MAX: %w", err) + } + if value, ok := parseCounterLike(sfpMaxData["maximum_temperature"]); ok { + if collector.metricFilter.Enabled("sonic_thermal_sfp_maximum_temperature_celsius") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.sfpMaximumTemperature, prometheus.GaugeValue, value)) + } + } + + return metrics, skippedEntries, nil +} diff --git a/internal/collector/transceiver_collector.go b/internal/collector/transceiver_collector.go new file mode 100644 index 0000000..184ecc6 --- /dev/null +++ b/internal/collector/transceiver_collector.go @@ -0,0 +1,358 @@ +package collector + +import ( + "context" + "fmt" + "log/slog" + "sort" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/vinted/sonic-exporter/pkg/redis" +) + +type transceiverCollectorConfig struct { + enabled bool + refreshInterval time.Duration + timeout time.Duration + maxPorts int + redisScanCount int64 +} + +type transceiverCollector struct { + moduleInfo *prometheus.Desc + statusValue *prometheus.Desc + statusFlagValue *prometheus.Desc + statusFlagChanges *prometheus.Desc + statusFlagLastSet *prometheus.Desc + statusFlagLastClear *prometheus.Desc + domFlagValue *prometheus.Desc + domFlagChanges *prometheus.Desc + domFlagLastSet *prometheus.Desc + domFlagLastClear *prometheus.Desc + domThresholdValue *prometheus.Desc + entriesSkipped *prometheus.Desc + entriesTruncated *prometheus.Desc + scrapeDuration *prometheus.Desc + scrapeCollectorSuccess *prometheus.Desc + cacheAge *prometheus.Desc + + logger *slog.Logger + metricFilter MetricFilter + config transceiverCollectorConfig + + mu sync.RWMutex + cachedMetrics []prometheus.Metric + lastSuccess float64 + lastScrapeDuration float64 + lastSkippedEntries float64 + lastTruncated float64 + lastRefreshTime time.Time +} + +func NewTransceiverCollector(logger *slog.Logger, metricFilter MetricFilter) *transceiverCollector { + const ( + namespace = "sonic" + subsystem = "transceiver" + ) + + collector := &transceiverCollector{ + moduleInfo: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "module_info"), + "Transceiver module state metadata, value is always 1", []string{"device", "module_state", "module_fault_cause"}, nil), + statusValue: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "status_value"), + "Transceiver status values from STATE_DB TRANSCEIVER_STATUS", []string{"device", "field"}, nil), + statusFlagValue: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "status_flag_value"), + "Transceiver status flag value from STATE_DB TRANSCEIVER_STATUS_FLAG", []string{"device", "flag"}, nil), + statusFlagChanges: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "status_flag_changes_total"), + "Transceiver status flag change count", []string{"device", "flag"}, nil), + statusFlagLastSet: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "status_flag_last_set_timestamp_seconds"), + "Unix timestamp when a transceiver status flag was last set", []string{"device", "flag"}, nil), + statusFlagLastClear: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "status_flag_last_clear_timestamp_seconds"), + "Unix timestamp when a transceiver status flag was last cleared", []string{"device", "flag"}, nil), + domFlagValue: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "dom_flag_value"), + "Transceiver DOM flag value from STATE_DB TRANSCEIVER_DOM_FLAG", []string{"device", "flag"}, nil), + domFlagChanges: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "dom_flag_changes_total"), + "Transceiver DOM flag change count", []string{"device", "flag"}, nil), + domFlagLastSet: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "dom_flag_last_set_timestamp_seconds"), + "Unix timestamp when a transceiver DOM flag was last set", []string{"device", "flag"}, nil), + domFlagLastClear: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "dom_flag_last_clear_timestamp_seconds"), + "Unix timestamp when a transceiver DOM flag was last cleared", []string{"device", "flag"}, nil), + domThresholdValue: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "dom_threshold_value"), + "Transceiver DOM threshold values", []string{"device", "threshold"}, nil), + entriesSkipped: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_skipped"), + "Number of transceiver entries skipped during latest refresh", nil, nil), + entriesTruncated: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "entries_truncated"), + "Whether transceiver collection hit port limits (1=yes, 0=no)", nil, nil), + scrapeDuration: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "scrape_duration_seconds"), + "Time it took for exporter to refresh transceiver metrics", nil, nil), + scrapeCollectorSuccess: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "collector_success"), + "Whether transceiver collector succeeded", nil, nil), + cacheAge: prometheus.NewDesc(prometheus.BuildFQName(namespace, subsystem, "cache_age_seconds"), + "Age of latest transceiver cache refresh", nil, nil), + logger: logger, + metricFilter: metricFilter, + config: transceiverCollectorConfig{ + enabled: parseBoolEnv(logger, "TRANSCEIVER_ENABLED", true), + refreshInterval: parseDurationEnv(logger, "TRANSCEIVER_REFRESH_INTERVAL", 60*time.Second), + timeout: parseDurationEnv(logger, "TRANSCEIVER_TIMEOUT", 2*time.Second), + maxPorts: parseIntEnv(logger, "TRANSCEIVER_MAX_PORTS", 1024), + redisScanCount: 128, + }, + } + + if !collector.config.enabled { + collector.logger.Info("Transceiver collector is disabled") + return collector + } + + collector.refreshMetrics() + go collector.refreshLoop() + + return collector +} + +func (collector *transceiverCollector) IsEnabled() bool { return collector.config.enabled } + +func (collector *transceiverCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- collector.moduleInfo + ch <- collector.statusValue + ch <- collector.statusFlagValue + ch <- collector.statusFlagChanges + ch <- collector.statusFlagLastSet + ch <- collector.statusFlagLastClear + ch <- collector.domFlagValue + ch <- collector.domFlagChanges + ch <- collector.domFlagLastSet + ch <- collector.domFlagLastClear + ch <- collector.domThresholdValue + ch <- collector.entriesSkipped + ch <- collector.entriesTruncated + ch <- collector.scrapeDuration + ch <- collector.scrapeCollectorSuccess + ch <- collector.cacheAge +} + +func (collector *transceiverCollector) Collect(ch chan<- prometheus.Metric) { + if !collector.config.enabled { + return + } + + collector.mu.RLock() + cachedMetrics := append([]prometheus.Metric{}, collector.cachedMetrics...) + lastScrapeDuration := collector.lastScrapeDuration + lastSuccess := collector.lastSuccess + lastSkippedEntries := collector.lastSkippedEntries + lastTruncated := collector.lastTruncated + lastRefreshTime := collector.lastRefreshTime + collector.mu.RUnlock() + + for _, metric := range cachedMetrics { + ch <- metric + } + + cacheAge := 0.0 + if !lastRefreshTime.IsZero() { + cacheAge = time.Since(lastRefreshTime).Seconds() + } + if collector.metricFilter.Enabled("sonic_transceiver_entries_skipped") { + ch <- prometheus.MustNewConstMetric(collector.entriesSkipped, prometheus.GaugeValue, lastSkippedEntries) + } + if collector.metricFilter.Enabled("sonic_transceiver_entries_truncated") { + ch <- prometheus.MustNewConstMetric(collector.entriesTruncated, prometheus.GaugeValue, lastTruncated) + } + if collector.metricFilter.Enabled("sonic_transceiver_scrape_duration_seconds") { + ch <- prometheus.MustNewConstMetric(collector.scrapeDuration, prometheus.GaugeValue, lastScrapeDuration) + } + if collector.metricFilter.Enabled("sonic_transceiver_collector_success") { + ch <- prometheus.MustNewConstMetric(collector.scrapeCollectorSuccess, prometheus.GaugeValue, lastSuccess) + } + if collector.metricFilter.Enabled("sonic_transceiver_cache_age_seconds") { + ch <- prometheus.MustNewConstMetric(collector.cacheAge, prometheus.GaugeValue, cacheAge) + } +} + +func (collector *transceiverCollector) refreshLoop() { + ticker := time.NewTicker(collector.config.refreshInterval) + defer ticker.Stop() + for range ticker.C { + collector.refreshMetrics() + } +} + +func (collector *transceiverCollector) refreshMetrics() { + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), collector.config.timeout) + defer cancel() + metrics, skippedEntries, truncated, err := collector.scrapeMetrics(ctx) + scrapeDuration := time.Since(start).Seconds() + + collector.mu.Lock() + defer collector.mu.Unlock() + collector.lastScrapeDuration = scrapeDuration + if err != nil { + collector.lastSuccess = 0 + collector.logger.Error("Error refreshing transceiver metrics", "error", err) + return + } + collector.cachedMetrics = metrics + collector.lastSkippedEntries = float64(skippedEntries) + collector.lastTruncated = truncated + collector.lastSuccess = 1 + collector.lastRefreshTime = time.Now() +} + +func (collector *transceiverCollector) scrapeMetrics(ctx context.Context) ([]prometheus.Metric, int, float64, error) { + redisClient, err := redis.NewClient() + if err != nil { + return nil, 0, 0, fmt.Errorf("redis client initialization failed: %w", err) + } + defer redisClient.Close() + + statusKeys, err := redisClient.ScanKeysFromDb(ctx, "STATE_DB", "TRANSCEIVER_STATUS|*", collector.config.redisScanCount) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to scan transceiver status keys: %w", err) + } + sort.Strings(statusKeys) + + metrics := []prometheus.Metric{} + skippedEntries := 0 + truncated := 0.0 + + for index, statusKey := range statusKeys { + if index >= collector.config.maxPorts { + truncated = 1 + skippedEntries += len(statusKeys) - index + break + } + + device, err := parseKeySuffix(statusKey, "TRANSCEIVER_STATUS|") + if err != nil { + skippedEntries++ + continue + } + + statusData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", statusKey) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver status entry %s: %w", statusKey, err) + } + if len(statusData) == 0 { + skippedEntries++ + continue + } + + if collector.metricFilter.Enabled("sonic_transceiver_module_info") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.moduleInfo, prometheus.GaugeValue, 1, device, statusData["module_state"], statusData["module_fault_cause"])) + } + + statusFields := make([]string, 0, len(statusData)) + for field := range statusData { + statusFields = append(statusFields, field) + } + sort.Strings(statusFields) + for _, field := range statusFields { + if field == "module_state" || field == "module_fault_cause" || field == "last_update_time" { + continue + } + + if value, ok := parseBoolish(statusData[field]); ok && collector.metricFilter.Enabled("sonic_transceiver_status_value") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.statusValue, prometheus.GaugeValue, value, device, field)) + } + } + + domFlagData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_DOM_FLAG|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver DOM flag entry for %s: %w", device, err) + } + domFlagChangesData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_DOM_FLAG_CHANGE_COUNT|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver DOM flag change-count entry for %s: %w", device, err) + } + domFlagSetData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_DOM_FLAG_SET_TIME|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver DOM flag set-time entry for %s: %w", device, err) + } + domFlagClearData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_DOM_FLAG_CLEAR_TIME|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver DOM flag clear-time entry for %s: %w", device, err) + } + collector.appendTransceiverFlags(metrics, &metrics, collector.domFlagValue, collector.domFlagChanges, collector.domFlagLastSet, collector.domFlagLastClear, device, domFlagData, + domFlagChangesData, + domFlagSetData, + domFlagClearData, + collector.metricFilter.Enabled("sonic_transceiver_dom_flag_value"), + collector.metricFilter.Enabled("sonic_transceiver_dom_flag_changes_total"), + collector.metricFilter.Enabled("sonic_transceiver_dom_flag_last_set_timestamp_seconds"), + collector.metricFilter.Enabled("sonic_transceiver_dom_flag_last_clear_timestamp_seconds"), + ) + + statusFlagData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_STATUS_FLAG|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver status flag entry for %s: %w", device, err) + } + statusFlagChangesData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_STATUS_FLAG_CHANGE_COUNT|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver status flag change-count entry for %s: %w", device, err) + } + statusFlagSetData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_STATUS_FLAG_SET_TIME|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver status flag set-time entry for %s: %w", device, err) + } + statusFlagClearData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_STATUS_FLAG_CLEAR_TIME|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver status flag clear-time entry for %s: %w", device, err) + } + collector.appendTransceiverFlags(metrics, &metrics, collector.statusFlagValue, collector.statusFlagChanges, collector.statusFlagLastSet, collector.statusFlagLastClear, device, statusFlagData, + statusFlagChangesData, + statusFlagSetData, + statusFlagClearData, + collector.metricFilter.Enabled("sonic_transceiver_status_flag_value"), + collector.metricFilter.Enabled("sonic_transceiver_status_flag_changes_total"), + collector.metricFilter.Enabled("sonic_transceiver_status_flag_last_set_timestamp_seconds"), + collector.metricFilter.Enabled("sonic_transceiver_status_flag_last_clear_timestamp_seconds"), + ) + + thresholdData, err := redisClient.HgetAllFromDb(ctx, "STATE_DB", "TRANSCEIVER_DOM_THRESHOLD|"+device) + if err != nil { + return nil, 0, 0, fmt.Errorf("failed to read transceiver threshold entry for %s: %w", device, err) + } + thresholdFields := make([]string, 0, len(thresholdData)) + for field := range thresholdData { + thresholdFields = append(thresholdFields, field) + } + sort.Strings(thresholdFields) + for _, field := range thresholdFields { + if field == "last_update_time" { + continue + } + if value, ok := parseCounterLike(thresholdData[field]); ok && collector.metricFilter.Enabled("sonic_transceiver_dom_threshold_value") { + metrics = append(metrics, prometheus.MustNewConstMetric(collector.domThresholdValue, prometheus.GaugeValue, value, device, field)) + } + } + } + + return metrics, skippedEntries, truncated, nil +} + +func (collector *transceiverCollector) appendTransceiverFlags(_ []prometheus.Metric, metrics *[]prometheus.Metric, valueDesc, changeDesc, setDesc, clearDesc *prometheus.Desc, device string, values, changes, setTimes, clearTimes map[string]string, emitValues, emitChanges, emitSet, emitClear bool) { + fields := make([]string, 0, len(values)) + for field := range values { + fields = append(fields, field) + } + sort.Strings(fields) + + for _, field := range fields { + if value, ok := parseBoolish(values[field]); ok && emitValues { + *metrics = append(*metrics, prometheus.MustNewConstMetric(valueDesc, prometheus.GaugeValue, value, device, field)) + } + if value, ok := parseCounterLike(changes[field]); ok && emitChanges { + *metrics = append(*metrics, prometheus.MustNewConstMetric(changeDesc, prometheus.CounterValue, value, device, field)) + } + if value, ok := parseEventTime(setTimes[field]); ok && emitSet { + *metrics = append(*metrics, prometheus.MustNewConstMetric(setDesc, prometheus.GaugeValue, value, device, field)) + } + if value, ok := parseEventTime(clearTimes[field]); ok && emitClear { + *metrics = append(*metrics, prometheus.MustNewConstMetric(clearDesc, prometheus.GaugeValue, value, device, field)) + } + } +}