From 7dd3dc3e4cecd252361b2031df3b30dd9465fac3 Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Tue, 7 Aug 2018 16:49:16 +0100 Subject: [PATCH 1/4] Added POC to send data to external Kafka service --- pkg/kafka/producer/config.go | 93 ++++++++++++++++++++++++++++++++- plugin/storage/kafka/options.go | 54 +++++++++++++++++-- 2 files changed, 141 insertions(+), 6 deletions(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 5079ca8fef2..9c1e1f3505f 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -15,6 +15,9 @@ package producer import ( + "crypto/tls" + "fmt" + "github.com/Shopify/sarama" ) @@ -25,12 +28,100 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { - Brokers []string + Brokers []string + Authenticators []Authenticator + Metadata bool + Version string } // NewProducer creates a new asynchronous kafka producer func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true + var err error + // Last write wins on conflict + for _, auth := range c.Authenticators { + saramaConfig, err = auth.Authenticate(saramaConfig) + if err != nil { + return nil, err + } + } + saramaConfig.Metadata.Full = c.Metadata + if c.Version == "" { + saramaConfig.Version = sarama.MinVersion + } else { + v, err := sarama.ParseKafkaVersion(c.Version) + if err != nil { + return nil, err + } + saramaConfig.Version = v + } return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } + +type AuthConfiguration interface{} + +type SASLAuthConfiguration struct { + Username string + Password string +} + +type TLSAuthConfiguration struct { + Enabled bool + Config *tls.Config +} + +type Authenticator interface { + Authenticate(config *sarama.Config) (*sarama.Config, error) +} + +type SASLAuthenticator struct { + config *SASLAuthConfiguration +} + +func NewSASLAuthenticator(auth AuthConfiguration) (*SASLAuthenticator, error) { + c, ok := auth.(SASLAuthConfiguration) + if !ok { + return nil, fmt.Errorf("cannot type assert AuthConfiguration into SASLAuthConfiguration") + } + return &SASLAuthenticator{ + config: &c, + }, nil +} + +func (s *SASLAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) { + if s.config == nil || config == nil { + return nil, fmt.Errorf("error") + } + config.Net.SASL.Enable = true + config.Net.SASL.User = s.config.Username + config.Net.SASL.Password = s.config.Password + config.Net.TLS.Enable = true + config.Version = sarama.V0_10_0_0 + return config, nil +} + +type TLSAuthenticator struct { + config *TLSAuthConfiguration +} + +func NewTLSAuthenticator(auth AuthConfiguration) (*TLSAuthenticator, error) { + c, ok := auth.(TLSAuthConfiguration) + if !ok { + return nil, fmt.Errorf("cannot type assert AuthConfiguration into TLSAuthConfiguration") + } + return &TLSAuthenticator{ + config: &c, + }, nil +} + +func (t *TLSAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) { + if t.config == nil || config == nil { + return nil, fmt.Errorf("error") + } + config.Net.TLS.Enable = t.config.Enabled + if t.config.Config != nil { + config.Net.TLS.Config = t.config.Config + } + return config, nil +} diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index e93ea8a75f9..d7506f66f08 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -25,10 +25,17 @@ import ( ) const ( - configPrefix = "kafka" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" + configPrefix = "kafka" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixEncoding = ".encoding" + suffixSaslEnabled = ".sasl.enabled" + suffixSaslUsername = ".sasl.username" + suffixSaslPassword = ".sasl.password" + suffixMetadata = ".metadata" + suffixTLSEnabled = ".tls.enabled" + //TODO Add required TLS config + suffixVersion = ".version" encodingJSON = "json" encodingProto = "protobuf" @@ -64,8 +71,45 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { + auths := make([]producer.Authenticator, 0) + saslEnabled := v.GetBool(configPrefix + suffixSaslEnabled) + if saslEnabled { + authConfig := producer.SASLAuthConfiguration{ + Username: v.GetString(configPrefix + suffixSaslUsername), + Password: v.GetString(configPrefix + suffixSaslPassword), + } + auth, err := producer.NewSASLAuthenticator(authConfig) + if err != nil { + panic(fmt.Sprintf("cannot initialize new SASL authenticator: %+v", err)) + } + auths = append(auths, auth) + } + tlsEnabled := v.GetBool(configPrefix + suffixTLSEnabled) + if tlsEnabled { + //TODO Build full TLS configuration + authConfig := producer.TLSAuthConfiguration{ + Enabled: true, + } + auth, err := producer.NewTLSAuthenticator(authConfig) + if err != nil { + panic(fmt.Sprintf("cannot initialize new TLS authenticator: %+v", err)) + } + auths = append(auths, auth) + } + fullMetadata := true + metadata := v.Get(configPrefix + suffixMetadata) + if metadata != nil { + if m, ok := metadata.(bool); !ok { + panic(fmt.Sprintf("config value %s%s must be a bool (true/false)", configPrefix, suffixMetadata)) + } else { + fullMetadata = m + } + } opt.config = producer.Configuration{ - Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","), + Brokers: strings.Split(v.GetString(configPrefix+suffixBrokers), ","), + Authenticators: auths, + Metadata: fullMetadata, + Version: v.GetString(configPrefix + suffixVersion), } opt.topic = v.GetString(configPrefix + suffixTopic) opt.encoding = v.GetString(configPrefix + suffixEncoding) From f364c6c36b1c628c71d2b92f8bf43e919396024d Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Tue, 7 Aug 2018 17:26:40 +0100 Subject: [PATCH 2/4] Removed hardcoded values --- pkg/kafka/producer/config.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 9c1e1f3505f..62470a39398 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -96,8 +96,6 @@ func (s *SASLAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, config.Net.SASL.Enable = true config.Net.SASL.User = s.config.Username config.Net.SASL.Password = s.config.Password - config.Net.TLS.Enable = true - config.Version = sarama.V0_10_0_0 return config, nil } From 4f63d669daa5b90570cd457590818931215007ce Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Wed, 15 Aug 2018 15:06:44 +0100 Subject: [PATCH 3/4] Added default flags --- .gitignore | 1 + plugin/storage/kafka/options.go | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/.gitignore b/.gitignore index bbe7333e369..2f01e3a758b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ cmd/query/query cmd/query/query-linux crossdock/crossdock-linux run-crossdock.log +.vscode/ diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index d7506f66f08..3a500daac6b 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -67,6 +67,26 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultEncoding, fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON), ) + flagSet.String( + configPrefix+suffixSaslEnabled, + false, + fmt.Sprintf("Enable SASL configuration"), + ) + flagSet.String( + configPrefix+suffixSaslUsername, + "", + fmt.Sprintf("SASL username"), + ) + flagSet.String( + configPrefix+suffixSaslPassword, + "", + fmt.Sprintf("SASL password"), + ) + flagSet.String( + configPrefix+suffixTLSEnabled, + "", + fmt.Sprintf("Enable TLS configuration"), + ) } // InitFromViper initializes Options with properties from viper From 16300552f8a78b40edf4fe200d38d90ef645d7d4 Mon Sep 17 00:00:00 2001 From: Joni Collinge Date: Wed, 15 Aug 2018 15:08:24 +0100 Subject: [PATCH 4/4] Added phony comments to bypass lint --- pkg/kafka/producer/config.go | 10 ++++++++++ plugin/storage/kafka/options.go | 6 +++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 62470a39398..2222e08402d 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -59,26 +59,32 @@ func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } +// AuthConfiguration ... type AuthConfiguration interface{} +// SASLAuthConfiguration ... type SASLAuthConfiguration struct { Username string Password string } +// TLSAuthConfiguration ... type TLSAuthConfiguration struct { Enabled bool Config *tls.Config } +// Authenticator ... type Authenticator interface { Authenticate(config *sarama.Config) (*sarama.Config, error) } +// SASLAuthenticator ... type SASLAuthenticator struct { config *SASLAuthConfiguration } +// NewSASLAuthenticator ... func NewSASLAuthenticator(auth AuthConfiguration) (*SASLAuthenticator, error) { c, ok := auth.(SASLAuthConfiguration) if !ok { @@ -89,6 +95,7 @@ func NewSASLAuthenticator(auth AuthConfiguration) (*SASLAuthenticator, error) { }, nil } +// Authenticate ... func (s *SASLAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) { if s.config == nil || config == nil { return nil, fmt.Errorf("error") @@ -99,10 +106,12 @@ func (s *SASLAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, return config, nil } +// TLSAuthenticator ... type TLSAuthenticator struct { config *TLSAuthConfiguration } +// NewTLSAuthenticator ... func NewTLSAuthenticator(auth AuthConfiguration) (*TLSAuthenticator, error) { c, ok := auth.(TLSAuthConfiguration) if !ok { @@ -113,6 +122,7 @@ func NewTLSAuthenticator(auth AuthConfiguration) (*TLSAuthenticator, error) { }, nil } +// Authenticate ... func (t *TLSAuthenticator) Authenticate(config *sarama.Config) (*sarama.Config, error) { if t.config == nil || config == nil { return nil, fmt.Errorf("error") diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 3a500daac6b..fefedce4aaf 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -67,7 +67,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultEncoding, fmt.Sprintf(`Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON), ) - flagSet.String( + flagSet.Bool( configPrefix+suffixSaslEnabled, false, fmt.Sprintf("Enable SASL configuration"), @@ -82,9 +82,9 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { "", fmt.Sprintf("SASL password"), ) - flagSet.String( + flagSet.Bool( configPrefix+suffixTLSEnabled, - "", + false, fmt.Sprintf("Enable TLS configuration"), ) }