Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commons/pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
ServiceFaultQuarantine = "fault-quarantine"
ServiceEventExporter = "event-exporter"
ServiceHealthEventsAnalyzer = "health-events-analyzer"
ServiceNodeDrainer = "node-drainer"
)

// MetadataKeyTraceID is the key used to store the trace ID in the health event's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ spec:
value: {{ $certMountPath }}
{{- end }}
{{- end }}
{{- if .Values.global.tracing.enabled }}
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: {{ .Values.global.tracing.endpoint | quote }}
- name: OTEL_EXPORTER_OTLP_INSECURE
value: {{ .Values.global.tracing.insecure | quote }}
{{- end }}
envFrom:
- configMapRef:
name: {{ if .Values.global.datastore }}{{ .Release.Name }}-datastore-config{{ else }}mongodb-config{{ end }}
Expand Down
4 changes: 2 additions & 2 deletions node-drainer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ require (
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
golang.org/x/sync v0.20.0
google.golang.org/protobuf v1.36.11
k8s.io/api v0.35.3
Expand Down Expand Up @@ -79,12 +81,10 @@ require (
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.68.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 // indirect
go.opentelemetry.io/otel v1.43.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 // indirect
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/otel/sdk v1.43.0 // indirect
go.opentelemetry.io/otel/trace v1.43.0 // indirect
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
Expand Down
80 changes: 48 additions & 32 deletions node-drainer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os/signal"
"strconv"
"syscall"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -32,6 +33,7 @@ import (
"github.com/nvidia/nvsentinel/commons/pkg/logger"
metrics "github.com/nvidia/nvsentinel/commons/pkg/metrics"
"github.com/nvidia/nvsentinel/commons/pkg/server"
"github.com/nvidia/nvsentinel/commons/pkg/tracing"
"github.com/nvidia/nvsentinel/data-models/pkg/model"
"github.com/nvidia/nvsentinel/node-drainer/pkg/initializer"
"github.com/nvidia/nvsentinel/store-client/pkg/client"
Expand Down Expand Up @@ -63,15 +65,30 @@ func (d *dataStoreAdapter) FindDocuments(ctx context.Context, filter interface{}
}

func main() {
logger.SetDefaultStructuredLogger("node-drainer", version)
logger.SetDefaultStructuredLoggerWithTraceCorrelation("node-drainer", version)
slog.Info("Starting node-drainer", "version", version, "commit", commit, "date", date)

if err := auditlogger.InitAuditLogger("node-drainer"); err != nil {
slog.Warn("Failed to initialize audit logger", "error", err)
}

if err := run(); err != nil {
slog.Error("Node drainer module exited with error", "error", err)
// Initialize OpenTelemetry tracing
if err := tracing.InitTracing(tracing.ServiceNodeDrainer); err != nil {
slog.Warn("Failed to initialize tracing", "error", err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

runErr := run()

tracingCtx, tracingCancel := context.WithTimeout(context.Background(), 5*time.Second)

if err := tracing.ShutdownTracing(tracingCtx); err != nil {
slog.Warn("Failed to shutdown tracing", "error", err)
}

tracingCancel()

if runErr != nil {
slog.Error("Node drainer module exited with error", "error", runErr)

if closeErr := auditlogger.CloseAuditLogger(); closeErr != nil {
slog.Warn("Failed to close audit logger", "error", closeErr)
Expand Down Expand Up @@ -108,7 +125,7 @@ func run() error {
// Resolve the certificate path using common logic
databaseClientCertMountPath := certConfig.ResolveCertPath()

slog.Info("Database client cert", "path", databaseClientCertMountPath)
slog.InfoContext(ctx, "Database client cert", "path", databaseClientCertMountPath)

params := initializer.InitializationParams{
DatabaseClientCertMountPath: databaseClientCertMountPath,
Expand Down Expand Up @@ -139,40 +156,39 @@ func run() error {
ff.Set("custom_drain", components.CustomDrainEnabled)

// Informers must sync before processing events
slog.Info("Starting Kubernetes informers")
slog.InfoContext(gCtx, "Starting Kubernetes informers")

if err := components.Informers.Run(gCtx); err != nil {
return fmt.Errorf("failed to start informers: %w", err)
}

slog.Info("Kubernetes informers started and synced")
slog.InfoContext(gCtx, "Kubernetes informers started and synced")

slog.Info("Starting queue worker")
slog.InfoContext(gCtx, "Starting queue worker")
components.QueueManager.Start(gCtx)

// Handle cold start - re-process any events that were in-progress during restart
slog.Info("Handling cold start")
slog.InfoContext(gCtx, "Handling cold start")

if err := handleColdStart(gCtx, components); err != nil {
slog.Error("Cold start handling failed", "error", err)
slog.ErrorContext(gCtx, "Cold start handling failed", "error", err)
}

slog.Info("Starting database event watcher")
slog.InfoContext(gCtx, "Starting database event watcher")

criticalError := make(chan error)
startEventWatcher(gCtx, components, criticalError)

slog.Info("All components started successfully")
slog.InfoContext(gCtx, "All components started successfully")

// Monitor for critical errors or graceful shutdown signals.
g.Go(func() error {
select {
case <-gCtx.Done():
// Context was cancelled (SIGTERM/SIGINT or another goroutine failed)
slog.Info("Context cancelled, initiating shutdown")
slog.InfoContext(gCtx, "Context cancelled, initiating shutdown")
case err := <-criticalError:
// Critical component (event watcher) failed
slog.Error("Critical component failure", "error", err)
slog.ErrorContext(gCtx, "Critical component failure", "error", err)
stop() // Cancel context to trigger shutdown of other components

if shutdownErr := shutdownComponents(ctx, components); shutdownErr != nil {
Expand Down Expand Up @@ -209,10 +225,10 @@ func createMetricsServer(metricsPort string) (server.Server, error) {
// startMetricsServer starts the metrics server in an errgroup
func startMetricsServer(g *errgroup.Group, gCtx context.Context, srv server.Server) {
g.Go(func() error {
slog.Info("Starting metrics server")
slog.InfoContext(gCtx, "Starting metrics server")

if err := srv.Serve(gCtx); err != nil {
slog.Error("Metrics server failed - continuing without metrics", "error", err)
slog.ErrorContext(gCtx, "Metrics server failed - continuing without metrics", "error", err)
}

return nil
Expand All @@ -223,23 +239,23 @@ func startMetricsServer(g *errgroup.Group, gCtx context.Context, srv server.Serv
func startEventWatcher(ctx context.Context, components *initializer.Components, criticalError chan<- error) {
go func() {
if components.EventWatcher == nil {
slog.Warn("No event watcher available")
slog.WarnContext(ctx, "No event watcher available")
<-ctx.Done()

return
}

// Start the change stream watcher
components.EventWatcher.Start(ctx)
slog.Info("Event watcher started, consuming events")
slog.InfoContext(ctx, "Event watcher started, consuming events")

// Consume events from the change stream
for event := range components.EventWatcher.Events() {
// Preprocess and enqueue the event
// This sets the initial status to InProgress and enqueues the event for processing
if err := components.Reconciler.PreprocessAndEnqueueEvent(ctx, event); err != nil {
// Don't send to criticalError - just log and continue processing other events
slog.Error("Failed to preprocess and enqueue event", "error", err)
slog.ErrorContext(ctx, "Failed to preprocess and enqueue event", "error", err)
continue
}

Expand All @@ -248,19 +264,19 @@ func startEventWatcher(ctx context.Context, components *initializer.Components,
resumeToken := event.GetResumeToken()
if err := components.EventWatcher.MarkProcessed(ctx, resumeToken); err != nil {
// Don't send to criticalError - just log and continue
slog.Error("Error updating resume token", "error", err)
slog.ErrorContext(ctx, "Error updating resume token", "error", err)
}
}

// The event channel closed. If the context is still active, this means the
// change stream died unexpectedly (e.g., MongoDB error). Signal a critical
// failure so the pod exits and Kubernetes restarts it.
if ctx.Err() == nil {
slog.Error("Event watcher channel closed unexpectedly, event processing has stopped")
slog.ErrorContext(ctx, "Event watcher channel closed unexpectedly, event processing has stopped")

criticalError <- fmt.Errorf("event watcher channel closed unexpectedly")
} else {
slog.Info("Event watcher stopped")
slog.InfoContext(ctx, "Event watcher stopped")
}
}()
}
Expand All @@ -272,7 +288,7 @@ const coldStartBatchSize = 1000
// unbounded memory usage. All matching events are loaded (not just latest per node)
// because a single node can have multiple concurrent partial drains.
func handleColdStart(ctx context.Context, components *initializer.Components) error {
slog.Info("Querying for events requiring processing")
slog.InfoContext(ctx, "Querying for events requiring processing")

q := query.New().Build(
query.Or(
Expand Down Expand Up @@ -303,34 +319,34 @@ func handleColdStart(ctx context.Context, components *initializer.Components) er
for _, he := range batch {
event := he.RawEvent
if len(event) == 0 {
slog.Error("RawEvent is empty, skipping cold start event")
slog.ErrorContext(ctx, "RawEvent is empty, skipping cold start event")

continue
}

parsedEvent, err := eventutil.ParseHealthEventFromEvent(event)
if err != nil {
slog.Error("Failed to parse health event from cold start event", "error", err)
slog.ErrorContext(ctx, "Failed to parse health event from cold start event", "error", err)

continue
}

if parsedEvent.HealthEvent == nil {
slog.Error("Health event is nil in cold start event")
slog.ErrorContext(ctx, "Health event is nil in cold start event")

continue
}

nodeName := parsedEvent.HealthEvent.GetNodeName()
if nodeName == "" {
slog.Error("Node name is empty in cold start event")
slog.ErrorContext(ctx, "Node name is empty in cold start event")

continue
}

documentID, err := utils.ExtractDocumentIDNative(event)
if err != nil {
slog.Error("Failed to extract document ID from cold start event", "error", err)
slog.ErrorContext(ctx, "Failed to extract document ID from cold start event", "error", err)

continue
}
Expand All @@ -339,7 +355,7 @@ func handleColdStart(ctx context.Context, components *initializer.Components) er
ctx, nodeName, event, dbAdapter, healthStore, documentID); enqueueErr != nil {
slog.Error("Failed to enqueue cold start event", "error", enqueueErr, "nodeName", nodeName)
} else {
slog.Info("Re-queued event from cold start", "nodeName", nodeName)
slog.InfoContext(ctx, "Re-queued event from cold start", "nodeName", nodeName)
}
}

Expand All @@ -349,14 +365,14 @@ func handleColdStart(ctx context.Context, components *initializer.Components) er
return fmt.Errorf("failed to process cold start events: %w", err)
}

slog.Info("Cold start processing completed")
slog.InfoContext(ctx, "Cold start processing completed")

return nil
}

// shutdownComponents handles the shutdown of components
func shutdownComponents(ctx context.Context, components *initializer.Components) error {
slog.Info("Shutting down node drainer")
slog.InfoContext(ctx, "Shutting down node drainer")

if components.EventWatcher != nil {
if errStop := components.EventWatcher.Close(ctx); errStop != nil {
Expand All @@ -365,7 +381,7 @@ func shutdownComponents(ctx context.Context, components *initializer.Components)
}

components.QueueManager.Shutdown()
slog.Info("Node drainer stopped")
slog.InfoContext(ctx, "Node drainer stopped")

return nil
}
Loading
Loading