diff --git a/pkg/sloop/common/logging.go b/pkg/sloop/common/logging.go index 7eb42fa8..48ef2201 100644 --- a/pkg/sloop/common/logging.go +++ b/pkg/sloop/common/logging.go @@ -1,4 +1,3 @@ - package common import "github.com/golang/glog" diff --git a/pkg/sloop/ingress/kubewatcher_test.go b/pkg/sloop/ingress/kubewatcher_test.go index d0fe6e08..b5905fbf 100644 --- a/pkg/sloop/ingress/kubewatcher_test.go +++ b/pkg/sloop/ingress/kubewatcher_test.go @@ -156,22 +156,22 @@ func Test_bigPictureWithExclusionRules(t *testing.T) { t.Fatalf("Error creating service: %v\n", err) } + expectedEvents := 3 eventCount := 0 -loop: - for { + deadline := time.After(10 * time.Second) + for eventCount < expectedEvents { select { - case <-time.After(1 * time.Second): - break loop + case <-deadline: + t.Fatalf("Timed out waiting for events: got %d, expected %d", eventCount, expectedEvents) case result, ok := <-outChan: - if ok { - eventCount++ - assert.NotContains(t, result.Payload, `"name":"s2"`) - } else { - t.Fatalf("Channel closed unexpectedly: %v\n", ok) + if !ok { + t.Fatalf("Channel closed unexpectedly") } + eventCount++ + assert.NotContains(t, result.Payload, `"name":"s2"`) } } - assert.Equal(t, 3, eventCount) // assert no event for service named s2 + assert.Equal(t, expectedEvents, eventCount) kw.Stop() } diff --git a/pkg/sloop/kubeextractor/payloadhash.go b/pkg/sloop/kubeextractor/payloadhash.go new file mode 100644 index 00000000..c0d217fa --- /dev/null +++ b/pkg/sloop/kubeextractor/payloadhash.go @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package kubeextractor + +import ( + "hash/fnv" + + "github.com/Jeffail/gabs/v2" + "github.com/pkg/errors" +) + +// VolatileFields are JSON paths stripped before hashing for dedup. +// metadata volatile fields change on every API server touch but don't represent real changes. +// The entire "status" block is stripped because status changes constantly (replicas, metrics, +// container states) but spec/labels/annotations changes are what matter for debugging. +// Full payloads (including status) are still written on every actual write and every 30m snapshot. +var VolatileFields = [][]string{ + {"metadata", "resourceVersion"}, + {"metadata", "generation"}, + {"metadata", "uid"}, + {"metadata", "selfLink"}, + {"metadata", "managedFields"}, + {"status"}, +} + +// ComputePayloadHash computes an FNV-64a hash of the payload after stripping volatile fields. +// Returns uint64 hash suitable for fast in-memory comparison. +func ComputePayloadHash(payload string) (uint64, error) { + cleanPayload, err := stripVolatileFields(payload) + if err != nil { + return 0, err + } + + h := fnv.New64a() + _, err = h.Write([]byte(cleanPayload)) + if err != nil { + return 0, errors.Wrap(err, "failed to hash payload") + } + + return h.Sum64(), nil +} + +// stripVolatileFields removes known volatile/noisy fields from a Kubernetes resource JSON +// This ensures that the hash is stable across frequent updates that don't represent real changes +func stripVolatileFields(payload string) (string, error) { + jsonParsed, err := gabs.ParseJSON([]byte(payload)) + if err != nil { + return "", errors.Wrap(err, "failed to parse JSON payload") + } + + // Strip each volatile field path, ignoring errors if the path doesn't exist + for _, fieldPath := range VolatileFields { + _ = jsonParsed.Delete(fieldPath...) + } + + return jsonParsed.String(), nil +} diff --git a/pkg/sloop/kubeextractor/payloadhash_test.go b/pkg/sloop/kubeextractor/payloadhash_test.go new file mode 100644 index 00000000..bfdc6852 --- /dev/null +++ b/pkg/sloop/kubeextractor/payloadhash_test.go @@ -0,0 +1,233 @@ +/* + * Copyright (c) 2019, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package kubeextractor + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestComputePayloadHash_StableForSamePayload(t *testing.T) { + payload := `{ + "metadata": { + "name": "test-pod", + "namespace": "default", + "resourceVersion": "12345" + }, + "spec": { + "containers": [ + { + "name": "app", + "image": "app:v1" + } + ] + } +}` + + hash1, err := ComputePayloadHash(payload) + assert.NoError(t, err) + + hash2, err := ComputePayloadHash(payload) + assert.NoError(t, err) + + assert.Equal(t, hash1, hash2, "Hash should be stable for identical payloads") +} + +func TestComputePayloadHash_IgnoresResourceVersion(t *testing.T) { + payload1 := `{ + "metadata": { + "name": "test-pod", + "namespace": "default", + "resourceVersion": "12345" + }, + "spec": { + "containers": [ + { + "name": "app", + "image": "app:v1" + } + ] + } +}` + + payload2 := `{ + "metadata": { + "name": "test-pod", + "namespace": "default", + "resourceVersion": "99999" + }, + "spec": { + "containers": [ + { + "name": "app", + "image": "app:v1" + } + ] + } +}` + + hash1, err := ComputePayloadHash(payload1) + assert.NoError(t, err) + + hash2, err := ComputePayloadHash(payload2) + assert.NoError(t, err) + + assert.Equal(t, hash1, hash2, "Hash should ignore resourceVersion changes") +} + +func TestComputePayloadHash_IgnoresStatusConditions(t *testing.T) { + payload1 := `{ + "metadata": { + "name": "test-pod", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "app", + "image": "app:v1" + } + ] + }, + "status": { + "phase": "Running", + "conditions": [ + { + "type": "Ready", + "status": "True", + "lastTransitionTime": "2026-04-01T10:00:00Z" + } + ] + } +}` + + payload2 := `{ + "metadata": { + "name": "test-pod", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "app", + "image": "app:v1" + } + ] + }, + "status": { + "phase": "Running", + "conditions": [ + { + "type": "Ready", + "status": "True", + "lastTransitionTime": "2026-04-02T15:30:00Z" + } + ] + } +}` + + hash1, err := ComputePayloadHash(payload1) + assert.NoError(t, err) + + hash2, err := ComputePayloadHash(payload2) + assert.NoError(t, err) + + assert.Equal(t, hash1, hash2, "Hash should ignore status conditions and timestamps") +} + +func TestComputePayloadHash_DetectsMeaningfulChanges(t *testing.T) { + payloadBefore := `{ + "metadata": { + "name": "test-pod", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "app", + "image": "app:v1" + } + ] + } +}` + + payloadAfter := `{ + "metadata": { + "name": "test-pod", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "app", + "image": "app:v2" + } + ] + } +}` + + hashBefore, err := ComputePayloadHash(payloadBefore) + assert.NoError(t, err) + + hashAfter, err := ComputePayloadHash(payloadAfter) + assert.NoError(t, err) + + assert.NotEqual(t, hashBefore, hashAfter, "Hash should differ for meaningful spec changes") +} + +func TestComputePayloadHash_InvalidJSON(t *testing.T) { + invalidPayload := `{invalid json` + + _, err := ComputePayloadHash(invalidPayload) + assert.Error(t, err, "Should error on invalid JSON") +} + +func TestStripVolatileFields_RemovesResourceVersion(t *testing.T) { + payload := `{ + "metadata": { + "name": "test", + "resourceVersion": "12345" + }, + "spec": { + "image": "img:v1" + } +}` + + cleaned, err := stripVolatileFields(payload) + assert.NoError(t, err) + assert.NotContains(t, cleaned, "12345", "resourceVersion should be removed") + assert.Contains(t, cleaned, "test", "name should remain") + assert.Contains(t, cleaned, "img:v1", "spec.image should remain") +} + +func TestStripVolatileFields_PreservesSpec(t *testing.T) { + payload := `{ + "metadata": { + "name": "test-pod" + }, + "spec": { + "replicas": 3, + "template": { + "spec": { + "containers": [ + { + "name": "app", + "image": "app:latest" + } + ] + } + } + } +}` + + cleaned, err := stripVolatileFields(payload) + assert.NoError(t, err) + assert.Contains(t, cleaned, "replicas", "spec.replicas should be preserved") + assert.Contains(t, cleaned, "app:latest", "container image should be preserved") +} diff --git a/pkg/sloop/processing/dedup.go b/pkg/sloop/processing/dedup.go new file mode 100644 index 00000000..5601f974 --- /dev/null +++ b/pkg/sloop/processing/dedup.go @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2019, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package processing + +import ( + "fmt" + "sync" + "time" + + "github.com/golang/glog" +) + +// DedupEntry holds state for a single resource's deduplication +type DedupEntry struct { + payloadHash uint64 + lastWriteAt time.Time + resourceType string + namespace string + name string +} + +// DedupState manages in-memory deduplication state for all resources. +// It decides whether incoming watch events should be written to the watch table. +type DedupState struct { + mu sync.RWMutex + entries map[string]*DedupEntry + snapshotInterval time.Duration + enabled bool + skippedCount int64 + snapshotCount int64 + changedCount int64 + addedCount int64 + deletedCount int64 +} + +// NewDedupState creates a new deduplication state manager +func NewDedupState(enabled bool, snapshotInterval time.Duration) *DedupState { + return &DedupState{ + entries: make(map[string]*DedupEntry), + snapshotInterval: snapshotInterval, + enabled: enabled, + } +} + +// resourceKey generates a unique key for a resource +func (ds *DedupState) resourceKey(resourceType, namespace, name string) string { + return fmt.Sprintf("%s/%s/%s", resourceType, namespace, name) +} + +// ShouldWrite determines if an event should be written to the watch table. +// Returns (shouldWrite bool, reason string) for compatibility with watch.go +func (ds *DedupState) ShouldWrite(namespace, resourceType, name string, payloadHash uint64, eventTime time.Time, snapshotInterval time.Duration) (bool, string) { + if !ds.enabled { + return true, "dedup_disabled" + } + + ds.mu.Lock() + defer ds.mu.Unlock() + + key := ds.resourceKey(resourceType, namespace, name) + + // For UPDATE events, check if the payload has changed + entry, exists := ds.entries[key] + if !exists { + // First time seeing this resource + ds.entries[key] = &DedupEntry{ + payloadHash: payloadHash, + lastWriteAt: eventTime, + resourceType: resourceType, + namespace: namespace, + name: name, + } + ds.changedCount++ + return true, "changed" + } + + // Resource has been seen before + if entry.payloadHash != payloadHash { + // Payload hash changed, meaningful update + entry.payloadHash = payloadHash + entry.lastWriteAt = eventTime + ds.changedCount++ + glog.V(2).Infof("Dedup: writing %s/%s/%s (hash changed)", resourceType, namespace, name) + return true, "changed" + } + + // Hash is the same, check if enough time has passed for a snapshot + timeSinceLastWrite := eventTime.Sub(entry.lastWriteAt) + if timeSinceLastWrite >= snapshotInterval { + entry.lastWriteAt = eventTime + ds.snapshotCount++ + glog.V(2).Infof("Dedup: writing %s/%s/%s (snapshot, %v since last write)", resourceType, namespace, name, timeSinceLastWrite) + return true, "snapshot" + } + + // No change and within snapshot interval, skip it + ds.skippedCount++ + glog.V(3).Infof("Dedup: skipping %s/%s/%s (hash unchanged, %v since last write)", resourceType, namespace, name, timeSinceLastWrite) + return false, "skipped" +} + +// GetStats returns a snapshot of current dedup statistics +func (ds *DedupState) GetStats() map[string]int64 { + ds.mu.RLock() + defer ds.mu.RUnlock() + + return map[string]int64{ + "skipped": ds.skippedCount, + "snapshot": ds.snapshotCount, + "changed": ds.changedCount, + "added": ds.addedCount, + "deleted": ds.deletedCount, + "tracked": int64(len(ds.entries)), + } +} + +// Reset clears all dedup state (useful for testing) +func (ds *DedupState) Reset() { + ds.mu.Lock() + defer ds.mu.Unlock() + + ds.entries = make(map[string]*DedupEntry) + ds.skippedCount = 0 + ds.snapshotCount = 0 + ds.changedCount = 0 + ds.addedCount = 0 + ds.deletedCount = 0 +} diff --git a/pkg/sloop/processing/dedup_test.go b/pkg/sloop/processing/dedup_test.go new file mode 100644 index 00000000..e2cb1c90 --- /dev/null +++ b/pkg/sloop/processing/dedup_test.go @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2019, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package processing + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDedupState_Disabled(t *testing.T) { + ds := NewDedupState(false, 30*time.Minute) + now := time.Now() + + shouldWrite, reason := ds.ShouldWrite("default", "Pod", "test", 12345, now, 30*time.Minute) + assert.Equal(t, true, shouldWrite, "Should always write when dedup disabled") + assert.Equal(t, "dedup_disabled", reason) + + shouldWrite, reason = ds.ShouldWrite("default", "Pod", "test", 12345, now, 30*time.Minute) + assert.Equal(t, true, shouldWrite, "Should always write when dedup disabled") + assert.Equal(t, "dedup_disabled", reason) +} + +func TestDedupState_FirstWrite_AlwaysWrites(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + + shouldWrite, reason := ds.ShouldWrite("default", "Pod", "test", 12345, now, 30*time.Minute) + assert.Equal(t, true, shouldWrite) + assert.Equal(t, "changed", reason) + + stats := ds.GetStats() + assert.Equal(t, int64(1), stats["changed"]) +} + +func TestDedupState_HashChanged_Writes(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + interval := 30 * time.Minute + + // First event + shouldWrite, reason := ds.ShouldWrite("default", "Pod", "test", 11111, now, interval) + assert.Equal(t, true, shouldWrite) + assert.Equal(t, "changed", reason) + + // Same hash 1 minute later + shouldWrite, reason = ds.ShouldWrite("default", "Pod", "test", 11111, now.Add(1*time.Minute), interval) + assert.Equal(t, false, shouldWrite) + assert.Equal(t, "skipped", reason) + + // Event with different hash + shouldWrite, reason = ds.ShouldWrite("default", "Pod", "test", 22222, now.Add(2*time.Minute), interval) + assert.Equal(t, true, shouldWrite) + assert.Equal(t, "changed", reason) + + stats := ds.GetStats() + assert.Equal(t, int64(1), stats["skipped"]) + assert.Equal(t, int64(2), stats["changed"]) +} + +func TestDedupState_SnapshotInterval_WritesAfterInterval(t *testing.T) { + interval := 30 * time.Minute + ds := NewDedupState(true, interval) + now := time.Now() + + // First event + shouldWrite, reason := ds.ShouldWrite("default", "Pod", "test", 11111, now, interval) + assert.Equal(t, true, shouldWrite) + assert.Equal(t, "changed", reason) + + // Same hash at 15 minutes + shouldWrite, reason = ds.ShouldWrite("default", "Pod", "test", 11111, now.Add(15*time.Minute), interval) + assert.Equal(t, false, shouldWrite) + assert.Equal(t, "skipped", reason) + + // Same hash at 31 minutes (beyond interval) + shouldWrite, reason = ds.ShouldWrite("default", "Pod", "test", 11111, now.Add(31*time.Minute), interval) + assert.Equal(t, true, shouldWrite) + assert.Equal(t, "snapshot", reason) + + // Same hash at 32 minutes (just after interval) + shouldWrite, reason = ds.ShouldWrite("default", "Pod", "test", 11111, now.Add(32*time.Minute), interval) + assert.Equal(t, false, shouldWrite) + assert.Equal(t, "skipped", reason) + + // Same hash at 62 minutes (another interval) + shouldWrite, reason = ds.ShouldWrite("default", "Pod", "test", 11111, now.Add(62*time.Minute), interval) + assert.Equal(t, true, shouldWrite) + assert.Equal(t, "snapshot", reason) + + stats := ds.GetStats() + assert.Equal(t, int64(2), stats["skipped"]) + assert.Equal(t, int64(2), stats["snapshot"]) + assert.Equal(t, int64(1), stats["changed"]) +} + +func TestDedupState_MultipleResources_Tracked(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + interval := 30 * time.Minute + + // Add 3 different resources + ds.ShouldWrite("default", "Pod", "pod1", 11111, now, interval) + ds.ShouldWrite("default", "Pod", "pod2", 22222, now, interval) + ds.ShouldWrite("kube-system", "Deployment", "coredns", 33333, now, interval) + + stats := ds.GetStats() + assert.Equal(t, int64(3), stats["tracked"]) + assert.Equal(t, int64(3), stats["changed"]) +} + +func TestDedupState_DifferentNamespacesSeparate(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + interval := 30 * time.Minute + + // Same name, different namespace + ds.ShouldWrite("default", "Pod", "test", 11111, now, interval) + shouldWrite, _ := ds.ShouldWrite("kube-system", "Pod", "test", 11111, now, interval) + + // Should be treated as different resources + assert.Equal(t, true, shouldWrite) + + stats := ds.GetStats() + assert.Equal(t, int64(2), stats["tracked"]) +} + +func TestDedupState_DifferentKindsSeparate(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + interval := 30 * time.Minute + + // Same name and namespace, different kind + ds.ShouldWrite("default", "Pod", "test", 11111, now, interval) + shouldWrite, _ := ds.ShouldWrite("default", "Service", "test", 11111, now, interval) + + // Should be treated as different resources + assert.Equal(t, true, shouldWrite) + + stats := ds.GetStats() + assert.Equal(t, int64(2), stats["tracked"]) +} + +func TestDedupState_FirstEventIsWrite(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + + shouldWrite, reason := ds.ShouldWrite("default", "Pod", "new-resource", 99999, now, 30*time.Minute) + assert.Equal(t, true, shouldWrite, "First UPDATE for a resource should always write") + assert.Equal(t, "changed", reason) + + stats := ds.GetStats() + assert.Equal(t, int64(1), stats["changed"]) + assert.Equal(t, int64(1), stats["tracked"]) +} + +func TestDedupState_Reset(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + interval := 30 * time.Minute + + ds.ShouldWrite("default", "Pod", "test", 11111, now, interval) + stats1 := ds.GetStats() + assert.Equal(t, int64(1), stats1["tracked"]) + + ds.Reset() + stats2 := ds.GetStats() + assert.Equal(t, int64(0), stats2["tracked"]) + assert.Equal(t, int64(0), stats2["changed"]) +} + +func TestDedupState_GetStats(t *testing.T) { + ds := NewDedupState(true, 30*time.Minute) + now := time.Now() + interval := 30 * time.Minute + + ds.ShouldWrite("default", "Pod", "test1", 11111, now, interval) + ds.ShouldWrite("default", "Pod", "test1", 11111, now.Add(1*time.Minute), interval) + ds.ShouldWrite("default", "Pod", "test1", 11111, now.Add(2*time.Minute), interval) + ds.ShouldWrite("default", "Pod", "test1", 22222, now.Add(3*time.Minute), interval) + + stats := ds.GetStats() + assert.Equal(t, int64(2), stats["skipped"]) + assert.Equal(t, int64(2), stats["changed"]) + assert.Equal(t, int64(1), stats["tracked"]) +} diff --git a/pkg/sloop/processing/eventcount_test.go b/pkg/sloop/processing/eventcount_test.go index 1b54ed08..633acbb6 100644 --- a/pkg/sloop/processing/eventcount_test.go +++ b/pkg/sloop/processing/eventcount_test.go @@ -242,7 +242,7 @@ func addEventCount(t *testing.T, tables typed.Tables, timeStamp *timestamp.Times metadata := &kubeextractor.KubeMetadata{Name: "someName", Namespace: "someNamespace"} err = tables.Db().Update(func(txn badgerwrap.Txn) error { - updateKubeWatchTable(tables, txn, &watchRec, metadata, true) + updateKubeWatchTable(tables, txn, &watchRec, metadata, true, nil, 30*time.Minute) // For dedupe to work we need a record written to the watch table err2 := updateEventCountTable(tables, txn, &watchRec, &resourceMetadata, &involvedObject, someMaxLookback) if err2 != nil { @@ -251,7 +251,7 @@ func addEventCount(t *testing.T, tables typed.Tables, timeStamp *timestamp.Times kubeMetadata, err := kubeextractor.ExtractMetadata(watchRec.Payload) assert.Nil(t, err) - err2 = updateKubeWatchTable(tables, txn, &watchRec, &kubeMetadata, false) + err2 = updateKubeWatchTable(tables, txn, &watchRec, &kubeMetadata, false, nil, 30*time.Minute) return err2 }) assert.Nil(t, err) @@ -523,7 +523,7 @@ func Test_updateEventCountTable_NoUid_Success(t *testing.T) { watchRec := typed.KubeWatchResult{Kind: kubeextractor.PodKind, WatchType: typed.KubeWatchResult_UPDATE, Timestamp: ts, Payload: someNamePodPayload} metadata := &kubeextractor.KubeMetadata{Name: "somePodName", Namespace: "someNamespace"} err = tables.Db().Update(func(txn badgerwrap.Txn) error { - return updateKubeWatchTable(tables, txn, &watchRec, metadata, true) + return updateKubeWatchTable(tables, txn, &watchRec, metadata, true, nil, 30*time.Minute) }) assert.Nil(t, err) @@ -562,7 +562,7 @@ func Test_updateEventCountTable_NoUid_Failure(t *testing.T) { watchRec := typed.KubeWatchResult{Kind: kubeextractor.PodKind, WatchType: typed.KubeWatchResult_UPDATE, Timestamp: ts, Payload: somePodPayload} metadata := &kubeextractor.KubeMetadata{Name: "RandomName", Namespace: "someNamespace"} err = tables.Db().Update(func(txn badgerwrap.Txn) error { - return updateKubeWatchTable(tables, txn, &watchRec, metadata, true) + return updateKubeWatchTable(tables, txn, &watchRec, metadata, true, nil, 30*time.Minute) }) assert.Nil(t, err) diff --git a/pkg/sloop/processing/processing.go b/pkg/sloop/processing/processing.go index 18a0224e..19664cf2 100644 --- a/pkg/sloop/processing/processing.go +++ b/pkg/sloop/processing/processing.go @@ -20,21 +20,38 @@ import ( ) type Runner struct { - kubeWatchChan chan typed.KubeWatchResult - tables typed.Tables - inputWg *sync.WaitGroup - keepMinorNodeUpdates bool - maxLookback time.Duration + kubeWatchChan chan typed.KubeWatchResult + tables typed.Tables + inputWg *sync.WaitGroup + keepMinorNodeUpdates bool + maxLookback time.Duration + dedupState *DedupState + dedupSnapshotInterval time.Duration } var ( metricProcessingWatchtableUpdatecount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_processing_watchtable_updatecount"}) metricIngestionFailureCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_ingestion_failure_count"}) metricIngestionSuccessCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_ingestion_success_count"}) + metricDedupSkippedCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_dedup_skipped_count", Help: "Number of payload writes skipped due to deduplication"}) + metricDedupSnapshotCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_dedup_snapshot_count", Help: "Number of snapshot writes (same hash after interval)"}) + metricDedupChangedCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_dedup_changed_count", Help: "Number of changed payload writes"}) ) -func NewProcessing(kubeWatchChan chan typed.KubeWatchResult, tables typed.Tables, keepMinorNodeUpdates bool, maxLookback time.Duration) *Runner { - return &Runner{kubeWatchChan: kubeWatchChan, tables: tables, inputWg: &sync.WaitGroup{}, keepMinorNodeUpdates: keepMinorNodeUpdates, maxLookback: maxLookback} +func NewProcessing(kubeWatchChan chan typed.KubeWatchResult, tables typed.Tables, keepMinorNodeUpdates bool, maxLookback time.Duration, enablePayloadDedup bool, dedupSnapshotInterval time.Duration) *Runner { + var dedupState *DedupState + if enablePayloadDedup { + dedupState = NewDedupState(true, dedupSnapshotInterval) + } + return &Runner{ + kubeWatchChan: kubeWatchChan, + tables: tables, + inputWg: &sync.WaitGroup{}, + keepMinorNodeUpdates: keepMinorNodeUpdates, + maxLookback: maxLookback, + dedupState: dedupState, + dedupSnapshotInterval: dedupSnapshotInterval, + } } func (r *Runner) processingFailed(name string, err error) { @@ -79,7 +96,7 @@ func (r *Runner) Start() { } err = r.tables.Db().Update(func(txn badgerwrap.Txn) error { - return updateKubeWatchTable(r.tables, txn, &watchRec, &resourceMetadata, r.keepMinorNodeUpdates) + return updateKubeWatchTable(r.tables, txn, &watchRec, &resourceMetadata, r.keepMinorNodeUpdates, r.dedupState, r.dedupSnapshotInterval) }) if err != nil { r.processingFailed("updateKubeWatchTable", err) diff --git a/pkg/sloop/processing/watch.go b/pkg/sloop/processing/watch.go index f7501288..3e958e82 100644 --- a/pkg/sloop/processing/watch.go +++ b/pkg/sloop/processing/watch.go @@ -83,7 +83,7 @@ func doesNodeHaveMajorUpdates(tables typed.Tables, txn badgerwrap.Txn, watchRec return diff, nil } -func updateKubeWatchTable(tables typed.Tables, txn badgerwrap.Txn, watchRec *typed.KubeWatchResult, metadata *kubeextractor.KubeMetadata, keepMinorNodeUpdates bool) error { +func updateKubeWatchTable(tables typed.Tables, txn badgerwrap.Txn, watchRec *typed.KubeWatchResult, metadata *kubeextractor.KubeMetadata, keepMinorNodeUpdates bool, dedupState *DedupState, snapshotInterval time.Duration) error { metricProcessingWatchtableUpdatecount.Inc() key, err := toWatchTableKey(watchRec.Timestamp, watchRec.Kind, metadata.Namespace, metadata.Name) @@ -102,6 +102,27 @@ func updateKubeWatchTable(tables typed.Tables, txn badgerwrap.Txn, watchRec *typ } } + if dedupState != nil && watchRec.Kind != kubeextractor.NodeKind { + hash, err := kubeextractor.ComputePayloadHash(watchRec.Payload) + if err != nil { + return errors.Wrap(err, "Failed to compute payload hash") + } + + now := time.Now() + shouldWrite, reason := dedupState.ShouldWrite(metadata.Namespace, watchRec.Kind, metadata.Name, hash, now, snapshotInterval) + if !shouldWrite { + glog.V(2).Infof("Dedup skip %s/%s (hash unchanged, reason: %s)", watchRec.Kind, metadata.Name, reason) + metricDedupSkippedCount.Inc() + return nil + } + + if reason == "snapshot" { + metricDedupSnapshotCount.Inc() + } else if reason == "changed" { + metricDedupChangedCount.Inc() + } + } + err = tables.WatchTable().Set(txn, key.String(), watchRec) if err != nil { return errors.Wrap(err, "Put failed") diff --git a/pkg/sloop/processing/watch_test.go b/pkg/sloop/processing/watch_test.go index 6b20b73e..29ad9e81 100644 --- a/pkg/sloop/processing/watch_test.go +++ b/pkg/sloop/processing/watch_test.go @@ -102,7 +102,7 @@ func helper_runWatchTableProcessingOnInputs(t *testing.T, inRecs []*typed.KubeWa kubeMetadata, err := kubeextractor.ExtractMetadata(watchRec.Payload) assert.Nil(t, err) - return updateKubeWatchTable(tables, txn, watchRec, &kubeMetadata, keepMinorNodeUpdates) + return updateKubeWatchTable(tables, txn, watchRec, &kubeMetadata, keepMinorNodeUpdates, nil, 30*time.Minute) }) assert.Nil(t, err) } @@ -216,7 +216,7 @@ func Test_getLastKubeWatchResult(t *testing.T) { watchRec := typed.KubeWatchResult{Kind: kubeextractor.NodeKind, WatchType: typed.KubeWatchResult_UPDATE, Timestamp: ts, Payload: somePodPayload} metadata := &kubeextractor.KubeMetadata{Name: "someName", Namespace: "someNamespace"} err = tables.Db().Update(func(txn badgerwrap.Txn) error { - return updateKubeWatchTable(tables, txn, &watchRec, metadata, true) + return updateKubeWatchTable(tables, txn, &watchRec, metadata, true, nil, 30*time.Minute) }) assert.Nil(t, err) @@ -246,7 +246,7 @@ func Test_GetUidForWatchEntry(t *testing.T) { watchRec := typed.KubeWatchResult{Kind: kubeextractor.PodKind, WatchType: typed.KubeWatchResult_UPDATE, Timestamp: ts, Payload: somePodPayload} metadata := &kubeextractor.KubeMetadata{Name: "someName", Namespace: "someNamespace"} err = tables.Db().Update(func(txn badgerwrap.Txn) error { - return updateKubeWatchTable(tables, txn, &watchRec, metadata, true) + return updateKubeWatchTable(tables, txn, &watchRec, metadata, true, nil, 30*time.Minute) }) assert.Nil(t, err) diff --git a/pkg/sloop/processing/watchactivity_test.go b/pkg/sloop/processing/watchactivity_test.go index 28e43395..a4e52f4d 100644 --- a/pkg/sloop/processing/watchactivity_test.go +++ b/pkg/sloop/processing/watchactivity_test.go @@ -88,7 +88,7 @@ func Test_updateWatchActivityTable(t *testing.T) { // add a KubeWatchResult err = tables.Db().Update(func(txn badgerwrap.Txn) error { - return updateKubeWatchTable(tables, txn, watchRec, &metadata, true) + return updateKubeWatchTable(tables, txn, watchRec, &metadata, true, nil, 30*time.Minute) }) assert.Nil(t, err) diff --git a/pkg/sloop/server/internal/config/config.go b/pkg/sloop/server/internal/config/config.go index 9b37af5b..dbd86fa7 100644 --- a/pkg/sloop/server/internal/config/config.go +++ b/pkg/sloop/server/internal/config/config.go @@ -82,6 +82,9 @@ type SloopConfig struct { EnableUserMetrics bool `json:"enableUserMetrics"` PrivilegedAccess bool `json:"PrivilegedAccess"` BadgerDetailLogEnabled bool `json:"badgerDetailLogEnabled"` + EnablePayloadDedup bool `json:"enablePayloadDedup"` + DedupSnapshotInterval time.Duration `json:"dedupSnapshotInterval"` + EnableValueCompression bool `json:"enableValueCompression"` } func registerFlags(fs *flag.FlagSet, config *SloopConfig) { @@ -133,6 +136,9 @@ func registerFlags(fs *flag.FlagSet, config *SloopConfig) { fs.BoolVar(&config.BadgerVLogFileIOMapping, "badger-vlog-fileIO-mapping", config.BadgerVLogFileIOMapping, "Indicates which file loading mode should be used for the value log data, in memory constrained environments the value is recommended to be true") fs.BoolVar(&config.BadgerVLogTruncate, "badger-vlog-truncate", config.BadgerVLogTruncate, "Truncate value log if badger db offset is different from badger db size") fs.BoolVar(&config.BadgerDetailLogEnabled, "badger-detail-log-enabled", config.BadgerDetailLogEnabled, "Turns on detailed logging of BadgerDB") + fs.BoolVar(&config.EnablePayloadDedup, "enable-payload-dedup", config.EnablePayloadDedup, "Enable payload deduplication to skip unchanged resource writes") + fs.DurationVar(&config.DedupSnapshotInterval, "dedup-snapshot-interval", config.DedupSnapshotInterval, "Snapshot interval for dedup") + fs.BoolVar(&config.EnableValueCompression, "enable-value-compression", config.EnableValueCompression, "Enable zstd compression for stored values (reduces disk usage by 50-60%%)") } func getDefaultConfig() *SloopConfig { @@ -184,6 +190,9 @@ func getDefaultConfig() *SloopConfig { EnableUserMetrics: false, PrivilegedAccess: true, BadgerDetailLogEnabled: false, + EnablePayloadDedup: false, + DedupSnapshotInterval: 30 * time.Minute, + EnableValueCompression: false, ExclusionRules: map[string][]any{}, } return &defaultConfig @@ -231,6 +240,13 @@ func (c *SloopConfig) Validate() error { return fmt.Errorf("CleanupFrequency can not be less than 15 minutes. Badger is lazy about freeing space " + "on disk so we need to give it time to avoid over-correction") } + if c.DedupSnapshotInterval < 0 { + return fmt.Errorf("DedupSnapshotInterval can not be negative, got %v", c.DedupSnapshotInterval) + } + if c.EnablePayloadDedup && c.DedupSnapshotInterval == 0 { + return fmt.Errorf("DedupSnapshotInterval must be > 0 when EnablePayloadDedup is true " + + "(otherwise no periodic snapshot is taken and data is never refreshed on disk)") + } return nil } diff --git a/pkg/sloop/server/server.go b/pkg/sloop/server/server.go index a33a0949..868677c0 100644 --- a/pkg/sloop/server/server.go +++ b/pkg/sloop/server/server.go @@ -46,6 +46,11 @@ func RealMain() error { return errors.Wrap(err, "config validation failed") } + typed.ValueCompressionEnabled = conf.EnableValueCompression + if conf.EnableValueCompression { + glog.Infof("Value compression enabled (zstd)") + } + kubeContext, err := ingress.GetKubernetesContext(conf.ApiServerHost, conf.UseKubeContext, conf.PrivilegedAccess) if err != nil { return errors.Wrap(err, "failed to get kubernetes context") @@ -93,7 +98,7 @@ func RealMain() error { } tables := typed.NewTableList(db) - processor := processing.NewProcessing(kubeWatchChan, tables, conf.KeepMinorNodeUpdates, conf.MaxLookback) + processor := processing.NewProcessing(kubeWatchChan, tables, conf.KeepMinorNodeUpdates, conf.MaxLookback, conf.EnablePayloadDedup, conf.DedupSnapshotInterval) processor.Start() // Real kubernetes watcher diff --git a/pkg/sloop/store/typed/compression.go b/pkg/sloop/store/typed/compression.go new file mode 100644 index 00000000..1d8ad71a --- /dev/null +++ b/pkg/sloop/store/typed/compression.go @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2019, salesforce.com, inc. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package typed + +import ( + "sync" + + "github.com/klauspost/compress/zstd" +) + +var ( + zstdEncoder *zstd.Encoder + zstdDecoder *zstd.Decoder + zstdInitOnce sync.Once + zstdInitErr error +) + +// ValueCompressionEnabled controls whether new writes are compressed. +// Reads always auto-detect compressed vs raw data for backward compatibility. +var ValueCompressionEnabled bool + +var zstdMagic = [4]byte{0x28, 0xB5, 0x2F, 0xFD} + +func initZstd() { + zstdInitOnce.Do(func() { + zstdEncoder, zstdInitErr = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if zstdInitErr != nil { + return + } + var dec *zstd.Decoder + dec, zstdInitErr = zstd.NewReader(nil, zstd.WithDecoderConcurrency(0)) + if zstdInitErr != nil { + return + } + zstdDecoder = dec + }) +} + +// CompressValue compresses data with zstd if compression is enabled. +// Returns raw data unchanged when compression is disabled. +func CompressValue(data []byte) ([]byte, error) { + if !ValueCompressionEnabled { + return data, nil + } + initZstd() + if zstdInitErr != nil { + return nil, zstdInitErr + } + return zstdEncoder.EncodeAll(data, make([]byte, 0, len(data))), nil +} + +// DecompressValue auto-detects zstd-compressed data via the magic header +// and decompresses it. Uncompressed data is returned as-is. +func DecompressValue(data []byte) ([]byte, error) { + if len(data) < 4 { + return data, nil + } + if data[0] != zstdMagic[0] || data[1] != zstdMagic[1] || data[2] != zstdMagic[2] || data[3] != zstdMagic[3] { + return data, nil + } + initZstd() + if zstdInitErr != nil { + return nil, zstdInitErr + } + return zstdDecoder.DecodeAll(data, nil) +} + +// IsCompressed checks if data has the zstd magic header. +func IsCompressed(data []byte) bool { + if len(data) < 4 { + return false + } + return data[0] == zstdMagic[0] && data[1] == zstdMagic[1] && data[2] == zstdMagic[2] && data[3] == zstdMagic[3] +} diff --git a/pkg/sloop/store/typed/compression_test.go b/pkg/sloop/store/typed/compression_test.go new file mode 100644 index 00000000..55027f51 --- /dev/null +++ b/pkg/sloop/store/typed/compression_test.go @@ -0,0 +1,103 @@ +package typed + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompressDecompress_RoundTrip(t *testing.T) { + ValueCompressionEnabled = true + defer func() { ValueCompressionEnabled = false }() + + original := []byte(strings.Repeat(`{"kind":"Pod","metadata":{"name":"test-pod","namespace":"default"},"spec":{"containers":[{"name":"app","image":"nginx:latest"}]}}`, 5)) + + compressed, err := CompressValue(original) + assert.NoError(t, err) + assert.True(t, IsCompressed(compressed)) + assert.True(t, len(compressed) < len(original), "compressed should be smaller for non-trivial payloads") + + decompressed, err := DecompressValue(compressed) + assert.NoError(t, err) + assert.True(t, bytes.Equal(original, decompressed)) +} + +func TestDecompress_UncompressedData(t *testing.T) { + raw := []byte(`{"kind":"Pod","metadata":{"name":"test"}}`) + + result, err := DecompressValue(raw) + assert.NoError(t, err) + assert.True(t, bytes.Equal(raw, result), "uncompressed data should pass through unchanged") +} + +func TestCompress_DisabledPassesThrough(t *testing.T) { + ValueCompressionEnabled = false + + original := []byte(`{"kind":"Pod"}`) + result, err := CompressValue(original) + assert.NoError(t, err) + assert.True(t, bytes.Equal(original, result), "should return original when compression disabled") + assert.False(t, IsCompressed(result)) +} + +func TestCompressDecompress_LargePayload(t *testing.T) { + ValueCompressionEnabled = true + defer func() { ValueCompressionEnabled = false }() + + large := []byte(strings.Repeat(`{"kind":"Pod","metadata":{"name":"test-pod","namespace":"default","labels":{"app":"test","version":"v1"}}}`, 100)) + + compressed, err := CompressValue(large) + assert.NoError(t, err) + ratio := float64(len(compressed)) / float64(len(large)) + t.Logf("Original: %d bytes, Compressed: %d bytes, Ratio: %.2f%%", len(large), len(compressed), ratio*100) + assert.True(t, ratio < 0.15, "highly repetitive JSON should compress to <15%% of original") + + decompressed, err := DecompressValue(compressed) + assert.NoError(t, err) + assert.True(t, bytes.Equal(large, decompressed)) +} + +func TestDecompress_SmallData(t *testing.T) { + tiny := []byte{0x08, 0x01} + result, err := DecompressValue(tiny) + assert.NoError(t, err) + assert.True(t, bytes.Equal(tiny, result), "small data should pass through") +} + +func TestDecompress_EmptyData(t *testing.T) { + result, err := DecompressValue([]byte{}) + assert.NoError(t, err) + assert.Empty(t, result) +} + +func TestIsCompressed(t *testing.T) { + assert.False(t, IsCompressed([]byte{})) + assert.False(t, IsCompressed([]byte{0x08, 0x01})) + assert.False(t, IsCompressed([]byte{0x28, 0xB5, 0x2F})) + assert.True(t, IsCompressed([]byte{0x28, 0xB5, 0x2F, 0xFD, 0x00})) +} + +func BenchmarkCompressValue(b *testing.B) { + ValueCompressionEnabled = true + defer func() { ValueCompressionEnabled = false }() + + payload := []byte(strings.Repeat(`{"kind":"Pod","metadata":{"name":"test-pod","namespace":"default"}}`, 50)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = CompressValue(payload) + } +} + +func BenchmarkDecompressValue(b *testing.B) { + ValueCompressionEnabled = true + defer func() { ValueCompressionEnabled = false }() + + payload := []byte(strings.Repeat(`{"kind":"Pod","metadata":{"name":"test-pod","namespace":"default"}}`, 50)) + compressed, _ := CompressValue(payload) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = DecompressValue(compressed) + } +} diff --git a/pkg/sloop/store/typed/eventcounttablegen.go b/pkg/sloop/store/typed/eventcounttablegen.go index 3f4aef46..c029da60 100644 --- a/pkg/sloop/store/typed/eventcounttablegen.go +++ b/pkg/sloop/store/typed/eventcounttablegen.go @@ -46,6 +46,11 @@ func (t *ResourceEventCountsTable) Set(txn badgerwrap.Txn, key string, value *Re return errors.Wrapf(err, "protobuf marshal for table %v failed", t.tableName) } + outb, err = CompressValue(outb) + if err != nil { + return errors.Wrapf(err, "compression for table %v failed", t.tableName) + } + err = txn.Set([]byte(key), outb) if err != nil { return errors.Wrapf(err, "set for table %v failed", t.tableName) @@ -72,6 +77,11 @@ func (t *ResourceEventCountsTable) Get(txn badgerwrap.Txn, key string) (*Resourc return nil, errors.Wrapf(err, "value copy failed for table %v", t.tableName) } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, errors.Wrapf(err, "decompression failed for table %v", t.tableName) + } + retValue := &ResourceEventCounts{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { @@ -273,6 +283,10 @@ func (t *ResourceEventCountsTable) RangeRead(txn badgerwrap.Txn, keyPrefix *Even if err != nil { return nil, stats, err } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, stats, err + } retValue := &ResourceEventCounts{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { diff --git a/pkg/sloop/store/typed/resourcesummarytablegen.go b/pkg/sloop/store/typed/resourcesummarytablegen.go index 476c608e..16b99604 100644 --- a/pkg/sloop/store/typed/resourcesummarytablegen.go +++ b/pkg/sloop/store/typed/resourcesummarytablegen.go @@ -46,6 +46,11 @@ func (t *ResourceSummaryTable) Set(txn badgerwrap.Txn, key string, value *Resour return errors.Wrapf(err, "protobuf marshal for table %v failed", t.tableName) } + outb, err = CompressValue(outb) + if err != nil { + return errors.Wrapf(err, "compression for table %v failed", t.tableName) + } + err = txn.Set([]byte(key), outb) if err != nil { return errors.Wrapf(err, "set for table %v failed", t.tableName) @@ -72,6 +77,11 @@ func (t *ResourceSummaryTable) Get(txn badgerwrap.Txn, key string) (*ResourceSum return nil, errors.Wrapf(err, "value copy failed for table %v", t.tableName) } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, errors.Wrapf(err, "decompression failed for table %v", t.tableName) + } + retValue := &ResourceSummary{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { @@ -273,6 +283,10 @@ func (t *ResourceSummaryTable) RangeRead(txn badgerwrap.Txn, keyPrefix *Resource if err != nil { return nil, stats, err } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, stats, err + } retValue := &ResourceSummary{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { diff --git a/pkg/sloop/store/typed/tabletemplate.go b/pkg/sloop/store/typed/tabletemplate.go index 26370cf5..2f5355eb 100644 --- a/pkg/sloop/store/typed/tabletemplate.go +++ b/pkg/sloop/store/typed/tabletemplate.go @@ -44,6 +44,11 @@ func (t *ValueTypeTable) Set(txn badgerwrap.Txn, key string, value *ValueType) e return errors.Wrapf(err, "protobuf marshal for table %v failed", t.tableName) } + outb, err = CompressValue(outb) + if err != nil { + return errors.Wrapf(err, "compression for table %v failed", t.tableName) + } + err = txn.Set([]byte(key), outb) if err != nil { return errors.Wrapf(err, "set for table %v failed", t.tableName) @@ -70,6 +75,11 @@ func (t *ValueTypeTable) Get(txn badgerwrap.Txn, key string) (*ValueType, error) return nil, errors.Wrapf(err, "value copy failed for table %v", t.tableName) } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, errors.Wrapf(err, "decompression failed for table %v", t.tableName) + } + retValue := &ValueType{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { @@ -255,6 +265,10 @@ func (t *ValueTypeTable) RangeRead(txn badgerwrap.Txn, keyPrefix *KeyType, if err != nil { return nil, stats, err } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, stats, err + } retValue := &ValueType{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { diff --git a/pkg/sloop/store/typed/watchactivitytablegen.go b/pkg/sloop/store/typed/watchactivitytablegen.go index f051db51..75a530f1 100644 --- a/pkg/sloop/store/typed/watchactivitytablegen.go +++ b/pkg/sloop/store/typed/watchactivitytablegen.go @@ -46,6 +46,11 @@ func (t *WatchActivityTable) Set(txn badgerwrap.Txn, key string, value *WatchAct return errors.Wrapf(err, "protobuf marshal for table %v failed", t.tableName) } + outb, err = CompressValue(outb) + if err != nil { + return errors.Wrapf(err, "compression for table %v failed", t.tableName) + } + err = txn.Set([]byte(key), outb) if err != nil { return errors.Wrapf(err, "set for table %v failed", t.tableName) @@ -72,6 +77,11 @@ func (t *WatchActivityTable) Get(txn badgerwrap.Txn, key string) (*WatchActivity return nil, errors.Wrapf(err, "value copy failed for table %v", t.tableName) } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, errors.Wrapf(err, "decompression failed for table %v", t.tableName) + } + retValue := &WatchActivity{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { @@ -273,6 +283,10 @@ func (t *WatchActivityTable) RangeRead(txn badgerwrap.Txn, keyPrefix *WatchActiv if err != nil { return nil, stats, err } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, stats, err + } retValue := &WatchActivity{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { diff --git a/pkg/sloop/store/typed/watchtablegen.go b/pkg/sloop/store/typed/watchtablegen.go index e8e85b2e..3ddfd102 100644 --- a/pkg/sloop/store/typed/watchtablegen.go +++ b/pkg/sloop/store/typed/watchtablegen.go @@ -46,6 +46,11 @@ func (t *KubeWatchResultTable) Set(txn badgerwrap.Txn, key string, value *KubeWa return errors.Wrapf(err, "protobuf marshal for table %v failed", t.tableName) } + outb, err = CompressValue(outb) + if err != nil { + return errors.Wrapf(err, "compression for table %v failed", t.tableName) + } + err = txn.Set([]byte(key), outb) if err != nil { return errors.Wrapf(err, "set for table %v failed", t.tableName) @@ -72,6 +77,11 @@ func (t *KubeWatchResultTable) Get(txn badgerwrap.Txn, key string) (*KubeWatchRe return nil, errors.Wrapf(err, "value copy failed for table %v", t.tableName) } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, errors.Wrapf(err, "decompression failed for table %v", t.tableName) + } + retValue := &KubeWatchResult{} err = proto.Unmarshal(valueBytes, retValue) if err != nil { @@ -273,6 +283,10 @@ func (t *KubeWatchResultTable) RangeRead(txn badgerwrap.Txn, keyPrefix *WatchTab if err != nil { return nil, stats, err } + valueBytes, err = DecompressValue(valueBytes) + if err != nil { + return nil, stats, err + } retValue := &KubeWatchResult{} err = proto.Unmarshal(valueBytes, retValue) if err != nil {