From f72638ec577da9bea0c887d687651c2cc9a4e109 Mon Sep 17 00:00:00 2001 From: piccione99 Date: Thu, 26 Mar 2026 20:04:14 -0400 Subject: [PATCH] update to latest wrpkafka --- go.mod | 4 +- go.sum | 8 ++-- integration_tests/splitter.yaml | 10 ++++ internal/app/app_test.go | 6 ++- internal/app/default-config.yaml | 3 +- internal/app/publisher.go | 5 +- internal/publisher/config.go | 22 +++++++-- internal/publisher/config_test.go | 80 +++++++++++++++++++++++++------ 8 files changed, 110 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index bc2ec85..4654406 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/twmb/franz-go/plugin/kprom v1.3.0 github.com/xmidt-org/httpaux v0.4.2 github.com/xmidt-org/wrp-go/v5 v5.4.0 - github.com/xmidt-org/wrpkafka v0.1.0 + github.com/xmidt-org/wrpkafka v0.1.2 go.uber.org/fx v1.24.0 gopkg.in/dealancer/validate.v2 v2.1.0 ) @@ -78,7 +78,7 @@ require ( github.com/tklauser/go-sysconf v0.3.16 // indirect github.com/tklauser/numcpus v0.11.0 // indirect github.com/ugorji/go/codec v1.2.12 // indirect - github.com/xmidt-org/eventor v1.0.23 // indirect + github.com/xmidt-org/eventor v1.0.36 // indirect github.com/xmidt-org/wrp-go/v3 v3.7.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect diff --git a/go.sum b/go.sum index 4afc8c3..a251781 100644 --- a/go.sum +++ b/go.sum @@ -209,8 +209,8 @@ github.com/xmidt-org/arrange v0.5.1 h1:JHoU8C03ZQfZAG9Vpudj7RQj97pAHblwm6Co8mepU github.com/xmidt-org/arrange v0.5.1/go.mod h1:zeJWY01z21ihPiAnGNF+bELbJANWwsahGE2bzGyyPIs= github.com/xmidt-org/candlelight v0.2.1 h1:vbstNO5Jz8EWbkuZtHCA8HV6ngFwp4IJgfiCW4vKM3c= github.com/xmidt-org/candlelight v0.2.1/go.mod h1:lA3d2jFvE3/xv0huXmB2yqiCoKxIBGUJkm3d8lvpU+k= -github.com/xmidt-org/eventor v1.0.23 h1:BYRLNlvFs2xdWFH2mJZxtoadV2LzXGhUsNgdd/XsbvY= -github.com/xmidt-org/eventor v1.0.23/go.mod h1:rnoyWsy2Emwa3sNMF5x0c/ykUy4sk5W+1zyLUOmP1gw= +github.com/xmidt-org/eventor v1.0.36 h1:4jj/vkjkjB6CyDd73v+58OKif4nSFL+gYBDZaZKMXQ4= +github.com/xmidt-org/eventor v1.0.36/go.mod h1:hDYVga+QLP8ZfwIBtmDvXRtYbY2dPa6i0PNQrqtwYUg= github.com/xmidt-org/httpaux v0.4.2 h1:O6KTPy9Dtx6m5Ezb0nCQeAZWvBDgZvx82P16FSNauIE= github.com/xmidt-org/httpaux v0.4.2/go.mod h1:tZJ+SBoGNCxDOLopuSqrxaCkIVAQ+aPjNRf2XfMVwJA= github.com/xmidt-org/touchstone v0.1.7 h1:gi3uXLDhXONe+vdVAa9xQBucMD/tJ74NMER2Lw2lo7U= @@ -219,8 +219,8 @@ github.com/xmidt-org/wrp-go/v3 v3.7.0 h1:m9ghdq79Zzb0WjomUJ02rzFpI0RK8KTjArYpNIw github.com/xmidt-org/wrp-go/v3 v3.7.0/go.mod h1:eyMj+q/7LQ4SU6Z3s6VOwuTVSh6/DJBb2soBGBFSung= github.com/xmidt-org/wrp-go/v5 v5.4.0 h1:9bOO8e3uR+qek9uTmpZRahVJHbz6FM0Y7dLleLZ3MNY= github.com/xmidt-org/wrp-go/v5 v5.4.0/go.mod h1:SQlOnkDG5HnaziOf83g+/38GQfE2mIInPWP+Wdw3/FE= -github.com/xmidt-org/wrpkafka v0.1.0 h1:I7LFrFs8FyaBt2lQSTARuaBBnpFl6HQX5VL4ar61ufs= -github.com/xmidt-org/wrpkafka v0.1.0/go.mod h1:piR4kxV0gelMk+uK3Hb5aPj3v0oTCsUcV4HEpfIJIKk= +github.com/xmidt-org/wrpkafka v0.1.2 h1:6sKSujPhi40YWgSWwnImuDNw4tnjZHoRD7KsJtiWnPg= +github.com/xmidt-org/wrpkafka v0.1.2/go.mod h1:OrQDfmzIOESfLCPBQDbC6iOT7pFZLpvxrbL50tQ79v4= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= diff --git a/integration_tests/splitter.yaml b/integration_tests/splitter.yaml index ca6b926..66d0c5f 100644 --- a/integration_tests/splitter.yaml +++ b/integration_tests/splitter.yaml @@ -29,8 +29,18 @@ producer((replace)): topic_routes: - pattern: "*" topic: "default-events" + hash_key: "source" # Extract device ID from source field - pattern: "device-status" topic: "device-status-events" + hash_key: "source" # Extract device ID from source field + +buckets: + possible_buckets: + - name: "bucket-0" + threshold: 1.0 + target_bucket: "bucket-0" + partition_key_type: "source" # Extract device ID from source field for partitioning + missing_partition_key_action: "include" consumer((replace)): brokers: diff --git a/internal/app/app_test.go b/internal/app/app_test.go index df516f5..d65b587 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -405,15 +405,19 @@ func TestProvideConsumer(t *testing.T) { { Topic: "wrp-events", Pattern: "*", + HashKey: "source", }, }, } + routes, err := pubConfig.ToWRPKafkaRoutes() + require.NoError(t, err, "Setup should convert routes successfully") + pub, err := publisher.New( publisher.WithLogEmitter(logEmitter), publisher.WithMetricsEmitter(metricEmitter), publisher.WithBrokers(pubConfig.Brokers), - publisher.WithTopicRoutes(pubConfig.ToWRPKafkaRoutes()...), + publisher.WithTopicRoutes(routes...), ) require.NoError(t, err, "Setup should create publisher successfully") return pub diff --git a/internal/app/default-config.yaml b/internal/app/default-config.yaml index 4656591..2c97e2c 100644 --- a/internal/app/default-config.yaml +++ b/internal/app/default-config.yaml @@ -43,7 +43,7 @@ buckets: # List of all bucket names (e.g. "bucket-0", "bucket-1", "bucket-2") possible_buckets: - name: "bucket-0" - threshold: 0.5 + threshold: 1.0 # Required: Target bucket name to route messages to target_bucket: "bucket-0" @@ -54,6 +54,7 @@ producer: topic_routes: - pattern: "*" topic: "default-events" + hash_key: "metadata/hw-deviceid" # Use hardware device ID from metadata for partitioning brokers: restart_on_config_change: false target_region: us-east-1 diff --git a/internal/app/publisher.go b/internal/app/publisher.go index 176b4a7..b030612 100644 --- a/internal/app/publisher.go +++ b/internal/app/publisher.go @@ -33,7 +33,10 @@ func providePublisher(in PublisherIn) (PublisherOut, error) { cfg := in.Config // Convert config routes to wrpkafka routes - wrpRoutes := cfg.ToWRPKafkaRoutes() + wrpRoutes, err := cfg.ToWRPKafkaRoutes() + if err != nil { + return PublisherOut{}, fmt.Errorf("failed to convert topic routes: %w", err) + } // Build options from configuration - validation is handled by the option functions opts := []publisher.Option{ diff --git a/internal/publisher/config.go b/internal/publisher/config.go index 597c981..3e27b05 100644 --- a/internal/publisher/config.go +++ b/internal/publisher/config.go @@ -4,6 +4,7 @@ package publisher import ( + "fmt" "time" "github.com/xmidt-org/wrpkafka" @@ -43,24 +44,35 @@ type Brokers struct { type TopicRoute struct { Topic string Pattern string + HashKey string } // ToWRPKafkaRoute converts a TopicRoute to a wrpkafka.TopicRoute -func (tr TopicRoute) ToWRPKafkaRoute() wrpkafka.TopicRoute { +func (tr TopicRoute) ToWRPKafkaRoute() (wrpkafka.TopicRoute, error) { + hashKey, err := wrpkafka.ParseHashKey(tr.HashKey) + if err != nil { + return wrpkafka.TopicRoute{}, fmt.Errorf("failed to parse hash key %q: %w", tr.HashKey, err) + } + route := wrpkafka.TopicRoute{ Topic: tr.Topic, Pattern: wrpkafka.Pattern(tr.Pattern), + HashKey: hashKey, } - return route + return route, nil } // ToWRPKafkaRoutes converts all TopicRoutes to wrpkafka.TopicRoute slice -func (c Config) ToWRPKafkaRoutes() []wrpkafka.TopicRoute { +func (c Config) ToWRPKafkaRoutes() ([]wrpkafka.TopicRoute, error) { routes := make([]wrpkafka.TopicRoute, len(c.TopicRoutes)) for i, route := range c.TopicRoutes { - routes[i] = route.ToWRPKafkaRoute() + wrpRoute, err := route.ToWRPKafkaRoute() + if err != nil { + return nil, fmt.Errorf("failed to convert route %d: %w", i, err) + } + routes[i] = wrpRoute } - return routes + return routes, nil } // SASLConfig represents SASL authentication configuration diff --git a/internal/publisher/config_test.go b/internal/publisher/config_test.go index 6d9328a..c00f812 100644 --- a/internal/publisher/config_test.go +++ b/internal/publisher/config_test.go @@ -46,6 +46,7 @@ func (suite *ConfigTestSuite) TestTopicRoute_ToWRPKafkaRoute() { name string topicRoute TopicRoute expected wrpkafka.TopicRoute + expectError bool description string }{ { @@ -53,10 +54,12 @@ func (suite *ConfigTestSuite) TestTopicRoute_ToWRPKafkaRoute() { topicRoute: TopicRoute{ Topic: "events", Pattern: "event:.*", + HashKey: "source", }, expected: wrpkafka.TopicRoute{ Topic: "events", Pattern: wrpkafka.Pattern("event:.*"), + HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource}, }, description: "Should convert TopicRoute to wrpkafka.TopicRoute correctly", }, @@ -65,31 +68,64 @@ func (suite *ConfigTestSuite) TestTopicRoute_ToWRPKafkaRoute() { topicRoute: TopicRoute{ Topic: "commands", Pattern: "mac:.*/command", + HashKey: "metadata/hw-deviceid", }, expected: wrpkafka.TopicRoute{ Topic: "commands", Pattern: wrpkafka.Pattern("mac:.*/command"), + HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"}, }, - description: "Should handle complex routing patterns", + description: "Should handle complex routing patterns with metadata hash key", }, { name: "wildcard_route", topicRoute: TopicRoute{ Topic: "all-messages", Pattern: ".*", + HashKey: "none", }, expected: wrpkafka.TopicRoute{ Topic: "all-messages", Pattern: wrpkafka.Pattern(".*"), + HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyNone}, + }, + description: "Should handle wildcard patterns with no hash key", + }, + { + name: "route_with_default_hash_key", + topicRoute: TopicRoute{ + Topic: "events", + Pattern: "event:.*", + HashKey: "", // Empty should default to metadata/hw-deviceid + }, + expected: wrpkafka.TopicRoute{ + Topic: "events", + Pattern: wrpkafka.Pattern("event:.*"), + HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"}, + }, + description: "Should default to metadata/hw-deviceid when hash_key is empty", + }, + { + name: "invalid_hash_key", + topicRoute: TopicRoute{ + Topic: "events", + Pattern: "event:.*", + HashKey: "invalid", }, - description: "Should handle wildcard patterns", + expectError: true, + description: "Should return error for invalid hash key", }, } for _, tt := range tests { suite.Run(tt.name, func() { - result := tt.topicRoute.ToWRPKafkaRoute() - suite.Equal(tt.expected, result, tt.description) + result, err := tt.topicRoute.ToWRPKafkaRoute() + if tt.expectError { + suite.Error(err, tt.description) + } else { + suite.NoError(err, tt.description) + suite.Equal(tt.expected, result, tt.description) + } }) } } @@ -100,17 +136,18 @@ func (suite *ConfigTestSuite) TestConfig_ToWRPKafkaRoutes() { name string config Config expected []wrpkafka.TopicRoute + expectError bool description string }{ { name: "single_route", config: Config{ TopicRoutes: []TopicRoute{ - {Topic: "events", Pattern: "event:.*"}, + {Topic: "events", Pattern: "event:.*", HashKey: "source"}, }, }, expected: []wrpkafka.TopicRoute{ - {Topic: "events", Pattern: wrpkafka.Pattern("event:.*")}, + {Topic: "events", Pattern: wrpkafka.Pattern("event:.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource}}, }, description: "Should convert single route correctly", }, @@ -118,15 +155,15 @@ func (suite *ConfigTestSuite) TestConfig_ToWRPKafkaRoutes() { name: "multiple_routes", config: Config{ TopicRoutes: []TopicRoute{ - {Topic: "events", Pattern: "event:.*"}, - {Topic: "commands", Pattern: "mac:.*/command"}, - {Topic: "responses", Pattern: ".*response.*"}, + {Topic: "events", Pattern: "event:.*", HashKey: "source"}, + {Topic: "commands", Pattern: "mac:.*/command", HashKey: "metadata/hw-deviceid"}, + {Topic: "responses", Pattern: ".*response.*", HashKey: "none"}, }, }, expected: []wrpkafka.TopicRoute{ - {Topic: "events", Pattern: wrpkafka.Pattern("event:.*")}, - {Topic: "commands", Pattern: wrpkafka.Pattern("mac:.*/command")}, - {Topic: "responses", Pattern: wrpkafka.Pattern(".*response.*")}, + {Topic: "events", Pattern: wrpkafka.Pattern("event:.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeySource}}, + {Topic: "commands", Pattern: wrpkafka.Pattern("mac:.*/command"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyMetadata, MetadataField: "hw-deviceid"}}, + {Topic: "responses", Pattern: wrpkafka.Pattern(".*response.*"), HashKey: wrpkafka.HashKey{Name: wrpkafka.HashKeyNone}}, }, description: "Should convert multiple routes correctly", }, @@ -138,12 +175,27 @@ func (suite *ConfigTestSuite) TestConfig_ToWRPKafkaRoutes() { expected: []wrpkafka.TopicRoute{}, description: "Should handle empty routes slice", }, + { + name: "route_with_invalid_hash_key", + config: Config{ + TopicRoutes: []TopicRoute{ + {Topic: "events", Pattern: "event:.*", HashKey: "invalid"}, + }, + }, + expectError: true, + description: "Should return error for invalid hash key", + }, } for _, tt := range tests { suite.Run(tt.name, func() { - result := tt.config.ToWRPKafkaRoutes() - suite.Equal(tt.expected, result, tt.description) + result, err := tt.config.ToWRPKafkaRoutes() + if tt.expectError { + suite.Error(err, tt.description) + } else { + suite.NoError(err, tt.description) + suite.Equal(tt.expected, result, tt.description) + } }) } }