diff --git a/.ci/run.sh b/.ci/run.sh new file mode 100755 index 0000000..9822cd1 --- /dev/null +++ b/.ci/run.sh @@ -0,0 +1,33 @@ +#!/bin/bash +# This is intended to be run inside the docker container as the command of the docker-compose. + +env + +set -ex + +if [[ "$INTEGRATION" != "true" ]]; then + bundle exec rake test +else + # Define the Kafka:Confluent version pairs + VERSIONS=( + "4.1.0:8.0.0" + ) + + for pair in "${VERSIONS[@]}"; do + KAFKA_VERSION="${pair%%:*}" + CONFLUENT_VERSION="${pair##*:}" + + echo "==================================================" + echo " Testing with Kafka $KAFKA_VERSION / Confluent $CONFLUENT_VERSION" + echo "==================================================" + + export KAFKA_VERSION + export CONFLUENT_VERSION + + cd spec/integration && ./kafka_test_setup.sh && cd ../.. + bundle exec rspec -fd --tag integration + cd spec/integration && ./kafka_test_teardown.sh && cd ../.. + done +fi + + diff --git a/.travis.yml b/.travis.yml index a50fc73..890a1c5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,11 @@ import: -- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file +- logstash-plugins/.ci:travis/travis.yml@1.x + +jobs: + include: + - stage: "Integration Tests" + env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current + - env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current + - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current + - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current + - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=main \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a0dd55..e770f20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +## 3.5.0 + - Add SSL/TLS support for HTTPS schema registry connections + - Add `ssl_enabled` option to enable/disable SSL + - Add `ssl_certificate` and `ssl_key` options for PEM-based client authentication (unencrypted keys only) + - Add `ssl_certificate_authorities` option for PEM-based server certificate validation + - Add `ssl_verification_mode` option to control SSL verification (full, none) + - Add `ssl_cipher_suites` option to configure cipher suites + - Add `ssl_supported_protocols` option to configure TLS protocol versions (TLSv1.1, TLSv1.2, TLSv1.3) + - Add `ssl_truststore_path` and `ssl_truststore_password` options for server certificate validation (JKS/PKCS12) + - Add `ssl_keystore_path` and `ssl_keystore_password` options for mutual TLS authentication (JKS/PKCS12) + - Add `ssl_truststore_type` and `ssl_keystore_type` options (JKS or PKCS12) + - Add HTTP proxy support with `proxy` option + - Add HTTP basic authentication support with `username` and `password` options + ## 3.4.1 - Fixes `(Errno::ENOENT) No such file or directory` error [#43](https://github.com/logstash-plugins/logstash-codec-avro/pull/43) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 7695ac2..1b13d1e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -83,9 +83,25 @@ output { |Setting |Input type|Required | <> | <>|No | <> | <>, one of `["binary", "base64"]`|No +| <> |<>|No +| <> |<>|No | <> |<>|Yes +| <> |<>|No +| <> |list of <>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No |=======================================================================   @@ -112,6 +128,23 @@ Use `base64` (default) to indicate that this codec sends or expects to receive b Set this option to `binary` to indicate that this codec sends or expects to receive binary Avro data. +[id="plugins-{type}s-{plugin}-password"] +===== `password` + +* Value type is <> +* There is no default value for this setting. + +Password for HTTP basic authentication when fetching remote schemas. +Used together with `username`. + +[id="plugins-{type}s-{plugin}-proxy"] +===== `proxy` + +* Value type is <> +* There is no default value for this setting. + +The address of a forward HTTP proxy to use when contacting a remote schema registry. + [id="plugins-{type}s-{plugin}-schema_uri"] ===== `schema_uri` @@ -134,6 +167,172 @@ example: tag events with `_avroparsefailure` when decode fails +[id="plugins-{type}s-{plugin}-ssl_certificate"] +===== `ssl_certificate` + +* Value type is <> +* There is no default value for this setting. + +Path to PEM encoded certificate file for client authentication (mutual TLS). +You may use this setting or <>, but not both simultaneously. + +*Example* +[source,ruby] +---------------------------------- +ssl_certificate => "/path/to/client.crt" +---------------------------------- + +[id="plugins-{type}s-{plugin}-ssl_certificate_authorities"] +===== `ssl_certificate_authorities` + +* Value type is a list of <> +* There is no default value for this setting. + +Path to PEM encoded CA certificate file(s) for server verification. +This is an alternative to using <>. +You may use this setting or <>, but not both simultaneously. + +*Example* +[source,ruby] +---------------------------------- +ssl_certificate_authorities => ["/path/to/ca.crt"] +---------------------------------- + +[id="plugins-{type}s-{plugin}-ssl_cipher_suites"] +===== `ssl_cipher_suites` + +* Value type is <> +* There is no default value for this setting. + +The list of cipher suites to use, listed by priorities. +Supported cipher suites vary depending on which version of Java is used. + +[id="plugins-{type}s-{plugin}-ssl_key"] +===== `ssl_key` + +* Value type is <> +* There is no default value for this setting. + +Path to PEM encoded private key file for client authentication. +Must be used together with <>. +The private key must be unencrypted (passphrase-protected keys are not supported). + +*Example* +[source,ruby] +---------------------------------- +ssl_key => "/path/to/client.key" +---------------------------------- + +[id="plugins-{type}s-{plugin}-ssl_enabled"] +===== `ssl_enabled` + +* Value type is <> +* There is no default value for this setting. + +Enable SSL/TLS secured communication to remote schema registry. +When using HTTPS schema URIs, SSL is automatically enabled. + +[id="plugins-{type}s-{plugin}-ssl_keystore_path"] +===== `ssl_keystore_path` + +* Value type is <> +* There is no default value for this setting. + +The path to the JKS or PKCS12 keystore file for client certificate authentication. +Use this when the schema registry requires mutual TLS (mTLS) authentication. + +[id="plugins-{type}s-{plugin}-ssl_keystore_password"] +===== `ssl_keystore_password` + +* Value type is <> +* There is no default value for this setting. + +The password for the keystore file specified in <>. + + +[id="plugins-{type}s-{plugin}-ssl_keystore_type"] +===== `ssl_keystore_type` + +* Value type is <> +* There is no default value for this setting. + +The format of the keystore file. It must be either `jks` or `pkcs12`. + +[id="plugins-{type}s-{plugin}-ssl_supported_protocols"] +===== `ssl_supported_protocols` + +* Value type is <> +* Default value is `[]` (uses Java defaults) +* Valid values are: `TLSv1.1`, `TLSv1.2`, `TLSv1.3` + +List of allowed SSL/TLS protocol versions. +When not specified, the JVM defaults are used. + +[id="plugins-{type}s-{plugin}-ssl_truststore_path"] +===== `ssl_truststore_path` + +* Value type is <> +* There is no default value for this setting. + +The path to the JKS or PKCS12 truststore file containing certificates to verify +the schema registry server's certificate. + +*Example* +[source,ruby] +---------------------------------- +input { + kafka { + codec => avro { + schema_uri => "https://schema-registry.example.com:8081/schemas/ids/1" + ssl_truststore_path => "/path/to/truststore.jks" + ssl_truststore_password => "${TRUSTSTORE_PASSWORD}" + } + } +} +---------------------------------- + +[id="plugins-{type}s-{plugin}-ssl_truststore_password"] +===== `ssl_truststore_password` + +* Value type is <> +* There is no default value for this setting. + +The password for the truststore file specified in <>. + +[id="plugins-{type}s-{plugin}-ssl_truststore_type"] +===== `ssl_truststore_type` + +* Value type is <> +* There is no default value for this setting. + +The format of the truststore file. It must be either `jks` or `pkcs12`. + +[id="plugins-{type}s-{plugin}-ssl_verification_mode"] +===== `ssl_verification_mode` + +* Value type is <> +* Default value is `"full"` +* Valid options are: `full`, `none` + +Options to verify the server's certificate: + +* `full`: Validates that the provided certificate has an issue date that's within the not_before and not_after dates; chains to a trusted Certificate Authority (CA); has a hostname or IP address that matches the names within the certificate. (recommended) +* `none`: Performs no certificate validation. **Warning:** Disabling this severely compromises security (https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf) + +*Example* +[source,ruby] +---------------------------------- +input { + kafka { + codec => avro { + schema_uri => "https://schema-registry.example.com:8081/schemas/ids/1" + ssl_certificate_authorities => ["/path/to/ca.crt"] + ssl_verification_mode => "full" + } + } +} +---------------------------------- + [id="plugins-{type}s-{plugin}-target"] ===== `target` @@ -156,3 +355,26 @@ input { } } ---------------------------------- + +[id="plugins-{type}s-{plugin}-username"] +===== `username` + +* Value type is <> +* There is no default value for this setting. + +Username for HTTP basic authentication when fetching remote schemas. +Used together with `password`. + +*Example* +[source,ruby] +---------------------------------- +input { + kafka { + codec => avro { + schema_uri => "https://schema-registry.example.com:8081/schemas/ids/1" + username => "registry_user" + password => "${REGISTRY_PASSWORD}" + } + } +} +---------------------------------- diff --git a/lib/logstash/codecs/avro.rb b/lib/logstash/codecs/avro.rb index 31c543c..3eddff6 100644 --- a/lib/logstash/codecs/avro.rb +++ b/lib/logstash/codecs/avro.rb @@ -1,7 +1,9 @@ # encoding: utf-8 require "open-uri" +require "manticore" require "avro" require "base64" +require "json" require "logstash/codecs/base" require "logstash/event" require "logstash/timestamp" @@ -50,6 +52,17 @@ # } # ---------------------------------- class LogStash::Codecs::Avro < LogStash::Codecs::Base + class BadResponseCodeError < LogStash::Error + attr_reader :code, :message, :uri + + def initialize(code, message, uri) + @code = code + @message = message + @uri = uri + super("HTTP #{code}: #{message} (#{uri})") + end + end + config_name "avro" include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1) @@ -84,9 +97,59 @@ class LogStash::Codecs::Avro < LogStash::Codecs::Base # NOTE: the target is only relevant while decoding data into a new event. config :target, :validate => :field_reference - def open_and_read(uri_string) - URI.open(uri_string, &:read) - end + # Proxy server URL for schema registry connections + config :proxy, :validate => :uri + + # Username for HTTP basic authentication + config :username, :validate => :string + + # Password for HTTP basic authentication + config :password, :validate => :password + + # Enable SSL/TLS secured communication to remote schema registry + config :ssl_enabled, :validate => :boolean + + # PEM-based SSL configuration (alternative to keystore/truststore) + # Path to PEM encoded certificate file for client authentication + config :ssl_certificate, :validate => :path + + # Path to PEM encoded private key file for client authentication + config :ssl_key, :validate => :path + + # Path to PEM encoded CA certificate file(s) for server verification + # Can be a single file or directory containing multiple CA certificates + config :ssl_certificate_authorities, :validate => :path, :list => true + + # Options to verify the server's certificate. + # "full": validates that the provided certificate has an issue date that’s within the not_before and not_after dates; + # chains to a trusted Certificate Authority (CA); has a hostname or IP address that matches the names within the certificate. + # "none": performs no certificate validation. Disabling this severely compromises security (https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf) + config :ssl_verification_mode, :validate => %w[full none] + + # The keystore path + config :ssl_keystore_path, :validate => :path + + # The keystore password + config :ssl_keystore_password, :validate => :password + + # Keystore type (jks or pkcs12) + config :ssl_keystore_type, :validate => %w[jks pkcs12] + + # The truststore path + config :ssl_truststore_path, :validate => :path + + # The truststore password + config :ssl_truststore_password, :validate => :password + + # Truststore type (jks or pkcs12) + config :ssl_truststore_type, :validate => %w[jks pkcs12] + + # The list of cipher suites to use, listed by priorities. + # Supported cipher suites vary depending on which version of Java is used. + config :ssl_cipher_suites, :validate => :string, :list => true + + # SSL supported protocols + config :ssl_supported_protocols, :validate => %w[TLSv1.1 TLSv1.2 TLSv1.3], :list => true public def initialize(*params) @@ -95,7 +158,7 @@ def initialize(*params) end def register - @schema = Avro::Schema.parse(open_and_read(schema_uri)) + @schema = Avro::Schema.parse(fetch_schema(schema_uri)) end public @@ -129,6 +192,186 @@ def encode(event) @on_event.call(event, Base64.strict_encode64(buffer.string)) else @on_event.call(event, buffer.string) - end + end + end + + private + def fetch_schema(uri_string) + http_connection = uri_string.start_with?('http://') + https_connection = uri_string.start_with?('https://') + + if http_connection + ssl_config_provided = original_params.keys.select {|k| k.start_with?("ssl_") && k != "ssl_enabled" } + if ssl_config_provided.any? + raise_config_error! "When SSL is disabled, the following provided parameters are not allowed: #{ssl_config_provided}" + end + + credentials_configured = @username && @password + @logger.warn("Credentials are being sent over unencrypted HTTP. This may bring security risk.") if credentials_configured + fetch_remote_schema(uri_string) + elsif https_connection + validate_ssl_settings! + fetch_remote_schema(uri_string) + else + # local schema + URI.open(uri_string, &:read) + end + end + + def fetch_remote_schema(uri_string) + client = nil + client_options = {} + + if @proxy && !@proxy.empty? + client_options[:proxy] = @proxy.to_s + end + + basic_auth_options = build_basic_auth + client_options[:auth] = basic_auth_options unless basic_auth_options.empty? + + if @ssl_enabled + ssl_options = build_ssl_options + client_options[:ssl] = ssl_options unless ssl_options.empty? + end + + client = Manticore::Client.new(client_options) + + @logger.debug("Fetching schema from #{uri_string}") + + max_retries = 3 + retry_count = 0 + body = nil + + begin + response = client.get(uri_string).call + + unless response.code == 200 + @logger.error("Failed to fetch schema from #{uri_string}: #{response.code} - #{response.message}") + raise BadResponseCodeError.new(response.code, response.message, uri_string) + end + + body = response.body + + # Parse and extract schema + parsed = JSON.parse(body) + return parsed.is_a?(Hash) && parsed.has_key?('schema') ? parsed['schema'] : body + + rescue JSON::ParserError + return body + rescue Manticore::ManticoreException, BadResponseCodeError => e + # 4xx don't retry + if e.is_a?(BadResponseCodeError) && e.code >= 400 && e.code < 500 + @logger.error("Failed to fetch schema from #{uri_string}: #{e.code} - #{e.message}") + raise + end + + # retry block + retry_count += 1 + if retry_count < max_retries + backoff_time = 2 ** (retry_count) # Exponential backoff: 1s, 2s, 4s + @logger.warn("Attempt #{retry_count}/#{max_retries} failed for #{uri_string}: #{e.class} - #{e.message}. Retrying in #{backoff_time}s...") + sleep(backoff_time) + retry + else + @logger.error("Failed to fetch schema from #{uri_string} after #{max_retries} attempts: #{e.class} - #{e.message}") + raise + end + end + ensure + client.close if client + end + + def build_basic_auth + if !@username && !@password + return {} + end + + raise LogStash::ConfigurationError, "`username` requires `password`" if @username && !@password + raise LogStash::ConfigurationError, "`password` is not allowed unless `username` is specified" if !@username && @password + + raise LogStash::ConfigurationError, "Empty `username` or `password` is not allowed" if @username.empty? || @password.value.empty? + + {:user => @username, :password => @password.value} + end + + def validate_ssl_settings! + @ssl_enabled = true if @ssl_enabled.nil? + raise_config_error! "Secured #{@schema_uri} connection requires `ssl_enabled => true`. " unless @ssl_enabled + @ssl_verification_mode = "full".freeze if @ssl_verification_mode.nil? + + # optional: presenting our identity + raise_config_error! "`ssl_certificate` and `ssl_keystore_path` cannot be used together." if @ssl_certificate && @ssl_keystore_path + raise_config_error! "`ssl_certificate` requires `ssl_key`" if @ssl_certificate && !@ssl_key + ensure_readable_and_non_writable! "ssl_certificate", @ssl_certificate if @ssl_certificate + + raise_config_error! "`ssl_key` is not allowed unless `ssl_certificate` is specified" if @ssl_key && !@ssl_certificate + ensure_readable_and_non_writable! "ssl_key", @ssl_key if @ssl_key + + raise_config_error! "`ssl_keystore_password` is not allowed unless `ssl_keystore_path` is specified" if @ssl_keystore_password && !@ssl_keystore_path + raise_config_error! "`ssl_keystore_password` cannot be empty" if @ssl_keystore_password && @ssl_keystore_password.value.empty? + raise_config_error! "`ssl_keystore_type` is not allowed unless `ssl_keystore_path` is specified" if @ssl_keystore_type && !@ssl_keystore_path + + ensure_readable_and_non_writable! "ssl_keystore_path", @ssl_keystore_path if @ssl_keystore_path + + # establishing trust of the server we connect to + # system-provided trust requires verification mode enabled + if @ssl_verification_mode == "none" + raise_config_error! "`ssl_truststore_path` requires `ssl_verification_mode` to be `full`" if @ssl_truststore_path + raise_config_error! "`ssl_truststore_password` requires `ssl_truststore_path` and `ssl_verification_mode => 'full'`" if @ssl_truststore_password + raise_config_error! "`ssl_certificate_authorities` requires `ssl_verification_mode` to be `full`" if @ssl_certificate_authorities + end + + raise_config_error! "`ssl_truststore_path` and `ssl_certificate_authorities` cannot be used together." if @ssl_truststore_path && @ssl_certificate_authorities + ensure_readable_and_non_writable! "ssl_truststore_path", @ssl_truststore_path if @ssl_truststore_path + + raise_config_error! "`ssl_truststore_password` is not allowed unless `ssl_truststore_path` is specified" if !@ssl_truststore_path && @ssl_truststore_password + raise_config_error! "`ssl_truststore_password` cannot be empty" if @ssl_truststore_password && @ssl_truststore_password.value.empty? + + if !@ssl_truststore_path && @ssl_certificate_authorities&.empty? + raise_config_error! "`ssl_certificate_authorities` cannot be empty" + end + + if @ssl_certificate_authorities && !@ssl_certificate_authorities.empty? + raise_config_error! "Multiple values on `ssl_certificate_authorities` are not supported by this plugin" if @ssl_certificate_authorities.size > 1 + ensure_readable_and_non_writable! "ssl_certificate_authorities", @ssl_certificate_authorities.first + end + end + + def build_ssl_options + ssl_options = {} + + ssl_options[:client_cert] = @ssl_certificate if @ssl_certificate + ssl_options[:client_key] = @ssl_key if @ssl_key + + ssl_options[:ca_file] = @ssl_certificate_authorities&.first if @ssl_certificate_authorities + + ssl_options[:cipher_suites] = @ssl_cipher_suites if @ssl_cipher_suites + + ssl_options[:verify] = :default if @ssl_verification_mode == 'full' + ssl_options[:verify] = :disable if @ssl_verification_mode == 'none' + + ssl_options[:keystore] = @ssl_keystore_path if @ssl_keystore_path + ssl_options[:keystore_password] = @ssl_keystore_password&.value if @ssl_keystore_path && @ssl_keystore_password + ssl_options[:keystore_type] = @ssl_keystore_type.downcase if @ssl_keystore_path && @ssl_keystore_type + + ssl_options[:truststore] = @ssl_truststore_path if @ssl_truststore_path + ssl_options[:truststore_password] = @ssl_truststore_password&.value if @ssl_truststore_path && @ssl_truststore_password + ssl_options[:truststore_type] = @ssl_truststore_type.downcase if @ssl_truststore_path && @ssl_truststore_type + + ssl_options[:protocols] = @ssl_supported_protocols if @ssl_supported_protocols && @ssl_supported_protocols&.any? + + ssl_options + end + + ## + # @param message [String] + # @raise [LogStash::ConfigurationError] + def raise_config_error!(message) + raise LogStash::ConfigurationError, message + end + + def ensure_readable_and_non_writable!(name, path) + raise_config_error! "Specified #{name} #{path} path must be readable." unless File.readable?(path) + raise_config_error! "Specified #{name} #{path} path must not be writable." if File.writable?(path) end end diff --git a/logstash-codec-avro.gemspec b/logstash-codec-avro.gemspec index de5eccf..7fda54c 100644 --- a/logstash-codec-avro.gemspec +++ b/logstash-codec-avro.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-codec-avro' - s.version = '3.4.1' + s.version = '3.5.0' s.platform = 'java' s.licenses = ['Apache-2.0'] s.summary = "Reads serialized Avro records as Logstash events" @@ -23,6 +23,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency "avro", "~> 1.10.2" #(Apache 2.0 license) + s.add_runtime_dependency "manticore", '>= 0.8.0', '< 1.0.0' s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~> 1.3' s.add_runtime_dependency 'logstash-mixin-event_support', '~> 1.0' s.add_runtime_dependency 'logstash-mixin-validator_support', '~> 1.0' diff --git a/spec/codecs/avro_spec.rb b/spec/codecs/avro_spec.rb deleted file mode 100644 index a584f80..0000000 --- a/spec/codecs/avro_spec.rb +++ /dev/null @@ -1,203 +0,0 @@ -# encoding: utf-8 -require 'logstash/devutils/rspec/spec_helper' -require 'insist' -require 'avro' -require 'base64' -require 'logstash/codecs/avro' -require 'logstash/event' -require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper' - -describe LogStash::Codecs::Avro, :ecs_compatibility_support, :aggregate_failures do - - ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select| - before(:each) do - allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) - end - - context "non binary data" do - let (:avro_config) {{ 'schema_uri' => ' - {"type": "record", "name": "Test", - "fields": [{"name": "foo", "type": ["null", "string"]}, - {"name": "bar", "type": "int"}]}' }} - let (:test_event_hash) { { "foo" => "hello", "bar" => 10 } } - let (:test_event) {LogStash::Event.new(test_event_hash)} - - subject do - allow_any_instance_of(LogStash::Codecs::Avro).to \ - receive(:open_and_read).and_return(avro_config['schema_uri']) - next LogStash::Codecs::Avro.new(avro_config) - end - - context "#decode" do - it "should return an LogStash::Event from raw and base64 encoded avro data" do - schema = Avro::Schema.parse(avro_config['schema_uri']) - dw = Avro::IO::DatumWriter.new(schema) - buffer = StringIO.new - encoder = Avro::IO::BinaryEncoder.new(buffer) - dw.write(test_event.to_hash, encoder) - - subject.decode(Base64.strict_encode64(buffer.string)) do |event| - insist {event.is_a? LogStash::Event} - insist {event.get("foo")} == test_event.get("foo") - insist {event.get("bar")} == test_event.get("bar") - expect(event.get('[event][original]')).to eq(Base64.strict_encode64(buffer.string)) if ecs_compatibility != :disabled - end - subject.decode(buffer.string) do |event| - insist {event.is_a? LogStash::Event} - insist {event.get("foo")} == test_event.get("foo") - insist {event.get("bar")} == test_event.get("bar") - expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled - end - end - - it "should throw exception if decoding fails" do - expect {subject.decode("not avro") {|_| }}.to raise_error NoMethodError - end - end - - context "with binary encoding" do - let (:avro_config) { super().merge('encoding' => 'binary') } - - it "should return an LogStash::Event from raw and base64 encoded avro data" do - schema = Avro::Schema.parse(avro_config['schema_uri']) - dw = Avro::IO::DatumWriter.new(schema) - buffer = StringIO.new - encoder = Avro::IO::BinaryEncoder.new(buffer) - dw.write(test_event.to_hash, encoder) - - subject.decode(buffer.string) do |event| - expect(event).to be_a_kind_of(LogStash::Event) - expect(event.get("foo")).to eq(test_event.get("foo")) - expect(event.get("bar")).to eq(test_event.get("bar")) - expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled - end - end - - it "should raise an error if base64 encoded data is provided" do - schema = Avro::Schema.parse(avro_config['schema_uri']) - dw = Avro::IO::DatumWriter.new(schema) - buffer = StringIO.new - encoder = Avro::IO::BinaryEncoder.new(buffer) - dw.write(test_event.to_hash, encoder) - - expect {subject.decode(Base64.strict_encode64(buffer.string))}.to raise_error - end - end - - context "#decode with tag_on_failure" do - let (:avro_config) {{ 'schema_uri' => ' - {"type": "record", "name": "Test", - "fields": [{"name": "foo", "type": ["null", "string"]}, - {"name": "bar", "type": "int"}]}', - 'tag_on_failure' => true}} - - it "should tag event on failure" do - subject.decode("not avro") do |event| - insist {event.is_a? LogStash::Event} - insist {event.get("tags")} == ["_avroparsefailure"] - end - end - end - - context "#decode with target" do - let(:avro_target) { "avro_target" } - let (:avro_config) {{ 'schema_uri' => ' - {"type": "record", "name": "Test", - "fields": [{"name": "foo", "type": ["null", "string"]}, - {"name": "bar", "type": "int"}]}', - 'target' => avro_target}} - - it "should return an LogStash::Event with content in target" do - schema = Avro::Schema.parse(avro_config['schema_uri']) - dw = Avro::IO::DatumWriter.new(schema) - buffer = StringIO.new - encoder = Avro::IO::BinaryEncoder.new(buffer) - dw.write(test_event.to_hash, encoder) - - subject.decode(buffer.string) do |event| - insist {event.get("[#{avro_target}][foo]")} == test_event.get("foo") - insist {event.get("[#{avro_target}][bar]")} == test_event.get("bar") - end - end - end - - context "#encode" do - it "should return avro data from a LogStash::Event" do - got_event = false - subject.on_event do |event, data| - schema = Avro::Schema.parse(avro_config['schema_uri']) - datum = StringIO.new(Base64.strict_decode64(data)) - decoder = Avro::IO::BinaryDecoder.new(datum) - datum_reader = Avro::IO::DatumReader.new(schema) - record = datum_reader.read(decoder) - - insist {record["foo"]} == test_event.get("foo") - insist {record["bar"]} == test_event.get("bar") - insist {event.is_a? LogStash::Event} - got_event = true - end - subject.encode(test_event) - insist {got_event} - end - - context "with binary encoding" do - let (:avro_config) { super().merge('encoding' => 'binary') } - - it "should return avro data from a LogStash::Event not base64 encoded" do - got_event = false - subject.on_event do |event, data| - schema = Avro::Schema.parse(avro_config['schema_uri']) - datum = StringIO.new(data) - decoder = Avro::IO::BinaryDecoder.new(datum) - datum_reader = Avro::IO::DatumReader.new(schema) - record = datum_reader.read(decoder) - - expect(event).to be_a_kind_of(LogStash::Event) - expect(event.get("foo")).to eq(test_event.get("foo")) - expect(event.get("bar")).to eq(test_event.get("bar")) - got_event = true - end - subject.encode(test_event) - expect(got_event).to be true - end - end - - context "binary data" do - - let (:avro_config) {{ 'schema_uri' => '{"namespace": "com.systems.test.data", - "type": "record", - "name": "TestRecord", - "fields": [ - {"name": "name", "type": ["string", "null"]}, - {"name": "longitude", "type": ["double", "null"]}, - {"name": "latitude", "type": ["double", "null"]} - ] - }' }} - let (:test_event) {LogStash::Event.new({ "name" => "foo", "longitude" => 21.01234.to_f, "latitude" => 111.0123.to_f })} - - subject do - allow_any_instance_of(LogStash::Codecs::Avro).to \ - receive(:open_and_read).and_return(avro_config['schema_uri']) - next LogStash::Codecs::Avro.new(avro_config) - end - - it "should correctly encode binary data" do - schema = Avro::Schema.parse(avro_config['schema_uri']) - dw = Avro::IO::DatumWriter.new(schema) - buffer = StringIO.new - encoder = Avro::IO::BinaryEncoder.new(buffer) - dw.write(test_event.to_hash, encoder) - - subject.decode(Base64.strict_encode64(buffer.string)) do |event| - insist {event.is_a? LogStash::Event} - insist {event.get("name")} == test_event.get("name") - insist {event.get("longitude")} == test_event.get("longitude") - insist {event.get("latitude")} == test_event.get("latitude") - end - end - end - end - - end - end -end diff --git a/spec/integration/avro_integration_spec.rb b/spec/integration/avro_integration_spec.rb new file mode 100644 index 0000000..660e21f --- /dev/null +++ b/spec/integration/avro_integration_spec.rb @@ -0,0 +1,431 @@ +# encoding: utf-8 +require 'logstash/devutils/rspec/spec_helper' +require 'logstash/codecs/avro' +require 'logstash/event' +require 'avro' +require 'base64' +require 'manticore' + +describe "Avro Codec Integration Tests", :integration => true do + INTEGRATION_DIR = File.expand_path('../', __FILE__) + + let(:test_schema) do + { + "type" => "record", + "name" => "TestRecord", + "namespace" => "com.example", + "fields" => [ + { "name" => "message", "type" => "string" }, + { "name" => "timestamp", "type" => "long" } + ] + } + end + let(:test_schema_json) { test_schema.to_json } + let(:test_event_data) do + { + "message" => "test message", + "timestamp" => Time.now.to_i + } + end + let(:config) { {} } + let(:codec) { LogStash::Codecs::Avro.new(config).tap { |c| c.register } } + + def run_integration_script(script_name) + Dir.chdir(INTEGRATION_DIR) do + result = system("./#{script_name}") + puts "Script #{script_name} #{result ? 'succeeded' : 'failed'}" + result + end + end + + def encode_avro_data(schema_json, data) + schema = Avro::Schema.parse(schema_json) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(data, encoder) + Base64.strict_encode64(buffer.string) + end + + def decode_with_codec(codec, encoded_data) + events = [] + codec.decode(encoded_data) do |event| + events << event + end + events + end + + def expect_decoded_event_matches(events, expected_data) + expect(events.size).to eq(1) + expect(events.first.get("message")).to eq(expected_data["message"]) + expect(events.first.get("timestamp")).to eq(expected_data["timestamp"]) + end + + def create_manticore_client(username: nil, password: nil, ssl_options: {}) + client_options = {} + if username && password + client_options[:auth] = { user: username, password: password } + end + client_options[:ssl] = ssl_options unless ssl_options.empty? + + Manticore::Client.new(client_options) + end + + def wait_for_schema_registry(url, username: nil, password: nil, ssl_options: {}) + puts "Waiting for Schema Registry at #{url}..." + client = create_manticore_client(username: username, password: password, ssl_options: ssl_options) + + Stud.try(20.times, [Manticore::SocketException, StandardError, RSpec::Expectations::ExpectationNotMetError]) do + response = client.get(url).call + expect(response.code).to eq(200) + end + client.close + end + + def register_schema(base_url, subject, schema_json, username: nil, password: nil, ssl_options: {}) + client = create_manticore_client(username: username, password: password, ssl_options: ssl_options) + response = client.post("#{base_url}/subjects/#{subject}/versions", + headers: { "Content-Type" => "application/vnd.schemaregistry.v1+json" }, + body: { schema: schema_json }.to_json + ).call + expect(response.code).to eq(200) + client.close + end + + context "Schema Registry without authentication" do + let(:schema_registry_url) { "http://localhost:8081" } + + before(:all) do + run_integration_script("start_schema_registry.sh") + wait_for_schema_registry("http://localhost:8081") + end + + after(:all) do + run_integration_script("stop_schema_registry.sh") + end + + context "fetching schema via HTTP" do + let(:schema_subject) { "test-no-auth-#{Time.now.to_i}" } + let(:full_schema_url) do + url = "#{schema_registry_url}/subjects/#{schema_subject}/versions/latest" + puts "Constructed schema URL: #{url}" + url + end + + let(:config) { super().merge({ 'schema_uri' => full_schema_url }) } + + before do + register_schema(schema_registry_url, schema_subject, test_schema_json) + end + + it "fetches and decodes schema from Schema Registry" do + encoded_data = encode_avro_data(test_schema_json, test_event_data) + events = decode_with_codec(codec, encoded_data) + expect_decoded_event_matches(events, test_event_data) + end + + it "encodes data using schema from schema registry" do + event = LogStash::Event.new(test_event_data) + encoded_data = nil + + codec.on_event do |e, data| + encoded_data = data + end + + codec.encode(event) + + expect(encoded_data).not_to be_nil + + events = decode_with_codec(codec, encoded_data) + expect_decoded_event_matches(events, test_event_data) + end + end + end + + context "Schema Registry with authentication" do + let(:schema_registry_url) { "http://localhost:8081" } + let(:username) { "barney" } + let(:password) { "changeme" } + + before(:all) do + run_integration_script("stop_schema_registry.sh") + run_integration_script("start_auth_schema_registry.sh") + wait_for_schema_registry("http://localhost:8081", username: "barney", password: "changeme") + end + + after(:all) do + run_integration_script("stop_schema_registry.sh") + end + + context "with valid credentials" do + let(:schema_subject) { "test-auth-#{Time.now.to_i}" } + let(:full_schema_url) { "#{schema_registry_url}/subjects/#{schema_subject}/versions/latest" } + let(:config) { super().merge({ 'schema_uri' => full_schema_url, 'username' => username, 'password' => password }) } + + before do + register_schema(schema_registry_url, schema_subject, test_schema_json, + username: username, password: password) + end + + it "fetches schema with valid credentials" do + encoded_data = encode_avro_data(test_schema_json, test_event_data) + events = decode_with_codec(codec, encoded_data) + expect_decoded_event_matches(events, test_event_data) + end + + it "encodes data with authentication" do + event = LogStash::Event.new(test_event_data) + encoded_data = nil + + codec.on_event do |e, data| + encoded_data = data + end + + codec.encode(event) + expect(encoded_data).not_to be_nil + end + end + + context "with invalid credentials" do + let(:schema_subject) { "test-invalid-auth-#{Time.now.to_i}" } + let(:full_schema_url) { "#{schema_registry_url}/subjects/#{schema_subject}/versions/latest" } + + before do + register_schema(schema_registry_url, schema_subject, test_schema_json, + username: username, password: password) + end + + it "fails with invalid credentials" do + expect { + invalid_config = { 'schema_uri' => full_schema_url, 'username' => 'invalid', 'password' => 'wrong' } + codec = LogStash::Codecs::Avro.new(invalid_config) + codec.register + }.to raise_error { |error| + expect(error.message).to include("Unauthorized") + } + end + end + end + + context "Schema Registry with truststore configuration" do + let(:schema_registry_https_url) { "https://localhost:8083" } + let(:truststore_path) { File.join(INTEGRATION_DIR, "tls_repository", "clienttruststore.jks") } + let(:truststore_password) { "changeit" } + let(:ca_cert_path) { File.join(INTEGRATION_DIR, "tls_repository", "schema_reg_certificate.pem") } + + before(:all) do + # Ensure non-auth registry is running (it includes HTTPS on 8083) + run_integration_script("stop_schema_registry.sh") + run_integration_script("start_schema_registry.sh") + + ssl_options = { + truststore: File.join(INTEGRATION_DIR, "tls_repository", "clienttruststore.jks"), + truststore_password: "changeit", + truststore_type: "jks", + verify: :default + } + wait_for_schema_registry("https://localhost:8083", ssl_options: ssl_options) + end + + after(:all) do + run_integration_script("stop_schema_registry.sh") + end + + context "with truststore configuration" do + let(:schema_subject) { "test-ssl-truststore-#{Time.now.to_i}" } + let(:full_schema_url) { "#{schema_registry_https_url}/subjects/#{schema_subject}/versions/latest" } + let(:config) do + super().merge({ + 'schema_uri' => full_schema_url, + 'ssl_enabled' => true, + 'ssl_truststore_path' => truststore_path, + 'ssl_truststore_password' => truststore_password, + 'ssl_truststore_type' => 'jks', + 'ssl_verification_mode' => 'full' + }) + end + + before do + ssl_options = { + truststore: truststore_path, + truststore_password: truststore_password, + truststore_type: "jks", + verify: :default + } + register_schema(schema_registry_https_url, schema_subject, test_schema_json, + ssl_options: ssl_options) + end + + it "fetches schema using truststore" do + encoded_data = encode_avro_data(test_schema_json, test_event_data) + events = decode_with_codec(codec, encoded_data) + expect_decoded_event_matches(events, test_event_data) + end + end + + context "with CA certificate configuration" do + let(:schema_subject) { "test-ssl-ca-#{Time.now.to_i}" } + let(:full_schema_url) { "#{schema_registry_https_url}/subjects/#{schema_subject}/versions/latest" } + let(:config) do + super().merge({ + 'schema_uri' => full_schema_url, + 'ssl_enabled' => true, + 'ssl_certificate_authorities' => [ca_cert_path], + 'ssl_verification_mode' => 'full' + }) + end + + before do + ssl_options = { ca_file: ca_cert_path, verify: :default } + register_schema(schema_registry_https_url, schema_subject, test_schema_json, + ssl_options: ssl_options) + end + + it "fetches schema using CA certificate" do + encoded_data = encode_avro_data(test_schema_json, test_event_data) + events = decode_with_codec(codec, encoded_data) + expect_decoded_event_matches(events, test_event_data) + end + end + end + + context "Schema Registry with authentication and SSL" do + let(:schema_registry_https_url) { "https://localhost:8083" } + let(:username) { "barney" } + let(:password) { "changeme" } + let(:truststore_path) { File.join(INTEGRATION_DIR, "tls_repository", "clienttruststore.jks") } + let(:truststore_password) { "changeit" } + + before(:all) do + # Start authenticated registry (includes HTTPS) + run_integration_script("stop_schema_registry.sh") + run_integration_script("start_auth_schema_registry.sh") + + ssl_options = { + truststore: File.join(INTEGRATION_DIR, "tls_repository", "clienttruststore.jks"), + truststore_password: "changeit", + truststore_type: "jks", + verify: :default + } + wait_for_schema_registry("https://localhost:8083", username: "barney", password: "changeme", ssl_options: ssl_options) + end + + after(:all) do + run_integration_script("stop_schema_registry.sh") + end + + context "with valid credentials and truststore" do + let(:schema_subject) { "test-auth-ssl-#{Time.now.to_i}" } + let(:full_schema_url) { "#{schema_registry_https_url}/subjects/#{schema_subject}/versions/latest" } + let(:config) do + super().merge({ + 'schema_uri' => full_schema_url, + 'username' => username, + 'password' => password, + 'ssl_enabled' => true, + 'ssl_truststore_path' => truststore_path, + 'ssl_truststore_password' => truststore_password, + 'ssl_truststore_type' => 'jks', + 'ssl_verification_mode' => 'full' + }) + end + + before do + ssl_options = { + truststore: truststore_path, + truststore_password: truststore_password, + truststore_type: "jks", + verify: :default + } + register_schema(schema_registry_https_url, schema_subject, test_schema_json, + username: username, password: password, + ssl_options: ssl_options) + end + + it "fetches schema with both authentication and SSL" do + encoded_data = encode_avro_data(test_schema_json, test_event_data) + events = decode_with_codec(codec, encoded_data) + expect_decoded_event_matches(events, test_event_data) + end + + it "encodes data with authentication and SSL" do + event = LogStash::Event.new(test_event_data) + encoded_data = nil + + codec.on_event do |e, data| + encoded_data = data + end + + codec.encode(event) + expect(encoded_data).not_to be_nil + end + end + end + + context "Schema Registry with keystore configuration (mutual TLS)" do + let(:schema_registry_https_url) { "https://localhost:8083" } + + let(:truststore_path) { File.join(INTEGRATION_DIR, "tls_repository", "clienttruststore.jks") } + let(:truststore_password) { "changeit" } + let(:keystore_path) { File.join(INTEGRATION_DIR, "tls_repository", "schema_reg.jks") } + let(:keystore_password) { "changeit" } + + before(:all) do + run_integration_script("stop_schema_registry.sh") + run_integration_script("start_schema_registry_mutual.sh") + + ssl_options = { + keystore: File.join(INTEGRATION_DIR, "tls_repository", "schema_reg.jks"), + keystore_password: "changeit", + keystore_type: "jks", + truststore: File.join(INTEGRATION_DIR, "tls_repository", "clienttruststore.jks"), + truststore_password: "changeit", + truststore_type: "jks" + } + wait_for_schema_registry("https://localhost:8083", ssl_options: ssl_options) + end + + after(:all) do + run_integration_script("stop_schema_registry.sh") + end + + context "with keystore and truststore" do + let(:schema_subject) { "test-mutual-tls-#{Time.now.to_i}" } + let(:full_schema_url) { "#{schema_registry_https_url}/subjects/#{schema_subject}/versions/latest" } + + let(:config) do + super().merge({ + 'schema_uri' => full_schema_url, + 'ssl_enabled' => true, + 'ssl_keystore_path' => keystore_path, + 'ssl_keystore_password' => keystore_password, + 'ssl_keystore_type' => 'jks', + 'ssl_truststore_path' => truststore_path, + 'ssl_truststore_password' => truststore_password, + 'ssl_truststore_type' => 'jks', + 'ssl_verification_mode' => 'full' + }) + end + + before do + ssl_options = { + keystore: keystore_path, + keystore_password: keystore_password, + keystore_type: "jks", + truststore: truststore_path, + truststore_password: truststore_password, + truststore_type: "jks", + verify: :default + } + register_schema(schema_registry_https_url, schema_subject, test_schema_json, + ssl_options: ssl_options) + end + + it "fetches schema" do + encoded_data = encode_avro_data(test_schema_json, test_event_data) + events = decode_with_codec(codec, encoded_data) + expect_decoded_event_matches(events, test_event_data) + end + end + end +end diff --git a/spec/integration/fixtures/jaas.config b/spec/integration/fixtures/jaas.config new file mode 100644 index 0000000..f1c29ac --- /dev/null +++ b/spec/integration/fixtures/jaas.config @@ -0,0 +1,5 @@ +SchemaRegistry-Props { + org.eclipse.jetty.security.jaas.spi.PropertyFileLoginModule required + file="build/confluent_platform/etc/schema-registry/pwd" + debug="true"; +}; diff --git a/spec/integration/fixtures/pwd b/spec/integration/fixtures/pwd new file mode 100644 index 0000000..7d2a92a --- /dev/null +++ b/spec/integration/fixtures/pwd @@ -0,0 +1,2 @@ +barney: changeme,user,developer +admin:admin,admin \ No newline at end of file diff --git a/spec/integration/fixtures/trust-store_stub.jks b/spec/integration/fixtures/trust-store_stub.jks new file mode 100644 index 0000000..e69de29 diff --git a/spec/integration/kafka_test_setup.sh b/spec/integration/kafka_test_setup.sh new file mode 100755 index 0000000..7d5548f --- /dev/null +++ b/spec/integration/kafka_test_setup.sh @@ -0,0 +1,85 @@ +#!/bin/bash +# Setup Kafka and create test topics + +set -ex +# check if KAFKA_VERSION env var is set +if [ -n "${KAFKA_VERSION+1}" ]; then + echo "KAFKA_VERSION is $KAFKA_VERSION" +else + KAFKA_VERSION=4.1.0 +fi + +KAFKA_MAJOR_VERSION="${KAFKA_VERSION%%.*}" + +export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true" + +rm -rf build +mkdir build + +echo "Setup Kafka version $KAFKA_VERSION" +if [ ! -e "kafka_2.13-$KAFKA_VERSION.tgz" ]; then + echo "Kafka not present locally, downloading" + curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://downloads.apache.org/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz" +fi +cp "kafka_2.13-$KAFKA_VERSION.tgz" "build/kafka.tgz" +mkdir "build/kafka" && tar xzf "build/kafka.tgz" -C "build/kafka" --strip-components 1 + +echo "Use KRaft for Kafka version $KAFKA_VERSION" +echo "log.dirs=${PWD}/build/kafka-logs" >> build/kafka/config/server.properties + +build/kafka/bin/kafka-storage.sh format \ + --cluster-id $(build/kafka/bin/kafka-storage.sh random-uuid) \ + --config build/kafka/config/server.properties \ + --ignore-formatted \ + --standalone + +echo "Starting Kafka broker" +build/kafka/bin/kafka-server-start.sh -daemon "build/kafka/config/server.properties" --override advertised.host.name=127.0.0.1 --override log.dirs="${PWD}/build/kafka-logs" +sleep 10 + +echo "Setup Confluent Platform" +# check if CONFLUENT_VERSION env var is set +if [ -n "${CONFLUENT_VERSION+1}" ]; then + echo "CONFLUENT_VERSION is $CONFLUENT_VERSION" +else + CONFLUENT_VERSION=8.0.0 +fi +if [ ! -e "confluent-community-$CONFLUENT_VERSION.tar.gz" ]; then + echo "Confluent Platform not present locally, downloading" + CONFLUENT_MINOR=$(echo "$CONFLUENT_VERSION" | sed -n 's/^\([[:digit:]]*\.[[:digit:]]*\)\.[[:digit:]]*$/\1/p') + echo "CONFLUENT_MINOR is $CONFLUENT_MINOR" + curl -s -o "confluent-community-$CONFLUENT_VERSION.tar.gz" "http://packages.confluent.io/archive/$CONFLUENT_MINOR/confluent-community-$CONFLUENT_VERSION.tar.gz" +fi +echo "Extracting confluent-community-$CONFLUENT_VERSION.tar.gz to build" +mkdir "build/confluent_platform" && tar xzf "confluent-community-$CONFLUENT_VERSION.tar.gz" -C "build/confluent_platform" --strip-components 1 + +echo "Configuring TLS on Schema registry" +rm -Rf tls_repository +mkdir tls_repository +./setup_keystore_and_truststore.sh +# configure schema-registry to handle https on 8083 port +if [[ "$OSTYPE" == "darwin"* ]]; then + sed -i '' 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' "build/confluent_platform/etc/schema-registry/schema-registry.properties" +else + sed -i 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' "build/confluent_platform/etc/schema-registry/schema-registry.properties" +fi +echo "ssl.keystore.location=`pwd`/tls_repository/schema_reg.jks" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties" +echo "ssl.keystore.password=changeit" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties" +echo "ssl.key.password=changeit" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties" + +cp "build/confluent_platform/etc/schema-registry/schema-registry.properties" "build/confluent_platform/etc/schema-registry/schema-registry-mutual.properties" +echo "ssl.truststore.location=`pwd`/tls_repository/clienttruststore.jks" >> "build/confluent_platform/etc/schema-registry/schema-registry-mutual.properties" +echo "ssl.truststore.password=changeit" >> "build/confluent_platform/etc/schema-registry/schema-registry-mutual.properties" +echo "confluent.http.server.ssl.client.authentication=REQUIRED" >> "build/confluent_platform/etc/schema-registry/schema-registry-mutual.properties" + +cp "build/confluent_platform/etc/schema-registry/schema-registry.properties" "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +echo "authentication.method=BASIC" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +echo "authentication.roles=admin,developer,user" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +echo "authentication.realm=SchemaRegistry-Props" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +cp fixtures/jaas.config "build/confluent_platform/etc/schema-registry" + +echo "Setting up a test topic" +build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain --bootstrap-server localhost:9092 + +cp fixtures/pwd "build/confluent_platform/etc/schema-registry" +echo "Setup complete, running specs" diff --git a/spec/integration/kafka_test_teardown.sh b/spec/integration/kafka_test_teardown.sh new file mode 100755 index 0000000..619d66a --- /dev/null +++ b/spec/integration/kafka_test_teardown.sh @@ -0,0 +1,16 @@ +#!/bin/bash +set -ex + +echo "Unregistering test topics" +#build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'topic_avro.*' 2>&1 + +echo "Stopping Kafka broker" +build/kafka/bin/kafka-server-stop.sh + +if [ -f "build/kafka/bin/zookeeper-server-stop.sh" ]; then + echo "Stopping ZooKeeper" + build/kafka/bin/zookeeper-server-stop.sh +fi + +echo "Clean TLS folder" +rm -Rf tls_repository diff --git a/spec/integration/setup_keystore_and_truststore.sh b/spec/integration/setup_keystore_and_truststore.sh new file mode 100755 index 0000000..64f8b49 --- /dev/null +++ b/spec/integration/setup_keystore_and_truststore.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Setup Schema Registry keystore and Kafka's schema registry client's truststore +set -ex + +echo "Generating schema registry key store" +keytool -genkey -alias schema_reg -keyalg RSA -keystore tls_repository/schema_reg.jks -keypass changeit -storepass changeit -validity 365 -keysize 2048 -dname "CN=localhost, OU=John Doe, O=Acme Inc, L=Unknown, ST=Unknown, C=IT" + +echo "Exporting schema registry certificate" +keytool -exportcert -rfc -keystore tls_repository/schema_reg.jks -storepass changeit -alias schema_reg -file tls_repository/schema_reg_certificate.pem + +echo "Creating client's truststore and importing schema registry's certificate" +keytool -import -trustcacerts -file tls_repository/schema_reg_certificate.pem -keypass changeit -storepass changeit -keystore tls_repository/clienttruststore.jks -noprompt + +# make files read only +chmod 444 tls_repository/schema_reg.jks +chmod 444 tls_repository/schema_reg_certificate.pem +chmod 444 tls_repository/clienttruststore.jks \ No newline at end of file diff --git a/spec/integration/start_auth_schema_registry.sh b/spec/integration/start_auth_schema_registry.sh new file mode 100755 index 0000000..c065558 --- /dev/null +++ b/spec/integration/start_auth_schema_registry.sh @@ -0,0 +1,8 @@ +#!/bin/bash +set -ex + +echo "Starting authed SchemaRegistry" +SCHEMA_REGISTRY_OPTS="-Djava.security.auth.login.config=build/confluent_platform/etc/schema-registry/jaas.config" \ + build/confluent_platform/bin/schema-registry-start \ + build/confluent_platform/etc/schema-registry/authed-schema-registry.properties \ + > /dev/null 2>&1 & \ No newline at end of file diff --git a/spec/integration/start_schema_registry.sh b/spec/integration/start_schema_registry.sh new file mode 100755 index 0000000..9a86afc --- /dev/null +++ b/spec/integration/start_schema_registry.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -ex + +echo "Starting SchemaRegistry" +build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/schema-registry.properties > /dev/null 2>&1 & diff --git a/spec/integration/start_schema_registry_mutual.sh b/spec/integration/start_schema_registry_mutual.sh new file mode 100755 index 0000000..be9b6be --- /dev/null +++ b/spec/integration/start_schema_registry_mutual.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set -ex + +echo "Starting SchemaRegistry" +build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/schema-registry-mutual.properties > /dev/null 2>&1 & diff --git a/spec/integration/stop_schema_registry.sh b/spec/integration/stop_schema_registry.sh new file mode 100755 index 0000000..7b402bd --- /dev/null +++ b/spec/integration/stop_schema_registry.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -ex + +echo "Stopping SchemaRegistry" +build/confluent_platform/bin/schema-registry-stop +sleep 2 \ No newline at end of file diff --git a/spec/unit/avro_spec.rb b/spec/unit/avro_spec.rb new file mode 100644 index 0000000..d3d8e83 --- /dev/null +++ b/spec/unit/avro_spec.rb @@ -0,0 +1,866 @@ +# encoding: utf-8 +require 'logstash/devutils/rspec/spec_helper' +require 'insist' +require 'avro' +require 'base64' +require 'logstash/codecs/avro' +require 'logstash/event' +require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper' + +describe LogStash::Codecs::Avro, :ecs_compatibility_support, :aggregate_failures do + let(:paths) do + { + # path has to be created, otherwise config :path validation fails + # and since we cannot control the chmod operations on paths, we should stub file readable? and writable? operations + :test_path => "spec/unit/resources/do_not_remove_path" + } + end + + ecs_compatibility_matrix(:disabled, :v1, :v8 => :v1) do |ecs_select| + before(:each) do + allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) + end + + context "non binary data" do + let (:avro_config) {{ 'schema_uri' => ' + {"type": "record", "name": "Test", + "fields": [{"name": "foo", "type": ["null", "string"]}, + {"name": "bar", "type": "int"}]}' }} + let (:test_event_hash) { { "foo" => "hello", "bar" => 10 } } + let (:test_event) {LogStash::Event.new(test_event_hash)} + + subject do + allow_any_instance_of(LogStash::Codecs::Avro).to \ + receive(:fetch_schema).and_return(avro_config['schema_uri']) + next LogStash::Codecs::Avro.new(avro_config) + end + + context "#decode" do + it "should return an LogStash::Event from raw and base64 encoded avro data" do + schema = Avro::Schema.parse(avro_config['schema_uri']) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(test_event.to_hash, encoder) + + subject.decode(Base64.strict_encode64(buffer.string)) do |event| + insist {event.is_a? LogStash::Event} + insist {event.get("foo")} == test_event.get("foo") + insist {event.get("bar")} == test_event.get("bar") + expect(event.get('[event][original]')).to eq(Base64.strict_encode64(buffer.string)) if ecs_compatibility != :disabled + end + subject.decode(buffer.string) do |event| + insist {event.is_a? LogStash::Event} + insist {event.get("foo")} == test_event.get("foo") + insist {event.get("bar")} == test_event.get("bar") + expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled + end + end + + it "should throw exception if decoding fails" do + expect {subject.decode("not avro") {|_| }}.to raise_error NoMethodError + end + end + + context "with binary encoding" do + let (:avro_config) { super().merge('encoding' => 'binary') } + + it "should return an LogStash::Event from raw and base64 encoded avro data" do + schema = Avro::Schema.parse(avro_config['schema_uri']) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(test_event.to_hash, encoder) + + subject.decode(buffer.string) do |event| + expect(event).to be_a_kind_of(LogStash::Event) + expect(event.get("foo")).to eq(test_event.get("foo")) + expect(event.get("bar")).to eq(test_event.get("bar")) + expect(event.get('[event][original]')).to eq(buffer.string) if ecs_compatibility != :disabled + end + end + + it "should raise an error if base64 encoded data is provided" do + schema = Avro::Schema.parse(avro_config['schema_uri']) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(test_event.to_hash, encoder) + + expect {subject.decode(Base64.strict_encode64(buffer.string))}.to raise_error + end + end + + context "#decode with tag_on_failure" do + let (:avro_config) {{ 'schema_uri' => ' + {"type": "record", "name": "Test", + "fields": [{"name": "foo", "type": ["null", "string"]}, + {"name": "bar", "type": "int"}]}', + 'tag_on_failure' => true}} + + it "should tag event on failure" do + subject.decode("not avro") do |event| + insist {event.is_a? LogStash::Event} + insist {event.get("tags")} == ["_avroparsefailure"] + end + end + end + + context "#decode with target" do + let(:avro_target) { "avro_target" } + let (:avro_config) {{ 'schema_uri' => ' + {"type": "record", "name": "Test", + "fields": [{"name": "foo", "type": ["null", "string"]}, + {"name": "bar", "type": "int"}]}', + 'target' => avro_target}} + + it "should return an LogStash::Event with content in target" do + schema = Avro::Schema.parse(avro_config['schema_uri']) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(test_event.to_hash, encoder) + + subject.decode(buffer.string) do |event| + insist {event.get("[#{avro_target}][foo]")} == test_event.get("foo") + insist {event.get("[#{avro_target}][bar]")} == test_event.get("bar") + end + end + end + + context "#encode" do + it "should return avro data from a LogStash::Event" do + got_event = false + subject.on_event do |event, data| + schema = Avro::Schema.parse(avro_config['schema_uri']) + datum = StringIO.new(Base64.strict_decode64(data)) + decoder = Avro::IO::BinaryDecoder.new(datum) + datum_reader = Avro::IO::DatumReader.new(schema) + record = datum_reader.read(decoder) + + insist {record["foo"]} == test_event.get("foo") + insist {record["bar"]} == test_event.get("bar") + insist {event.is_a? LogStash::Event} + got_event = true + end + subject.encode(test_event) + insist {got_event} + end + + context "with binary encoding" do + let (:avro_config) { super().merge('encoding' => 'binary') } + + it "should return avro data from a LogStash::Event not base64 encoded" do + got_event = false + subject.on_event do |event, data| + schema = Avro::Schema.parse(avro_config['schema_uri']) + datum = StringIO.new(data) + decoder = Avro::IO::BinaryDecoder.new(datum) + datum_reader = Avro::IO::DatumReader.new(schema) + record = datum_reader.read(decoder) + + expect(event).to be_a_kind_of(LogStash::Event) + expect(event.get("foo")).to eq(test_event.get("foo")) + expect(event.get("bar")).to eq(test_event.get("bar")) + got_event = true + end + subject.encode(test_event) + expect(got_event).to be true + end + end + + context "binary data" do + + let (:avro_config) {{ 'schema_uri' => '{"namespace": "com.systems.test.data", + "type": "record", + "name": "TestRecord", + "fields": [ + {"name": "name", "type": ["string", "null"]}, + {"name": "longitude", "type": ["double", "null"]}, + {"name": "latitude", "type": ["double", "null"]} + ] + }' }} + let (:test_event) {LogStash::Event.new({ "name" => "foo", "longitude" => 21.01234.to_f, "latitude" => 111.0123.to_f })} + + subject do + allow_any_instance_of(LogStash::Codecs::Avro).to \ + receive(:fetch_schema).and_return(avro_config['schema_uri']) + next LogStash::Codecs::Avro.new(avro_config) + end + + it "should correctly encode binary data" do + schema = Avro::Schema.parse(avro_config['schema_uri']) + dw = Avro::IO::DatumWriter.new(schema) + buffer = StringIO.new + encoder = Avro::IO::BinaryEncoder.new(buffer) + dw.write(test_event.to_hash, encoder) + + subject.decode(Base64.strict_encode64(buffer.string)) do |event| + insist {event.is_a? LogStash::Event} + insist {event.get("name")} == test_event.get("name") + insist {event.get("longitude")} == test_event.get("longitude") + insist {event.get("latitude")} == test_event.get("latitude") + end + end + end + end + + end + end + + context "remote schema registry" do + let(:test_schema) do + '{"type": "record", "name": "Test", + "fields": [{"name": "foo", "type": ["null", "string"]}, + {"name": "bar", "type": "int"}]}' + end + + subject do + allow_any_instance_of(LogStash::Codecs::Avro).to receive(:fetch_schema).and_return(test_schema) + next LogStash::Codecs::Avro.new(avro_config) + end + + context "basic authentication" do + + context "with both username and password" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'username' => 'test_user', + 'password' => 'test_&^%$!password' + } + end + + it "uses user and password" do + auth = subject.send(:build_basic_auth) + expect(auth).to eq({:user => 'test_user', :password => 'test_&^%$!password'}) + end + + it "includes valid credentials in auth hash" do + auth = subject.send(:build_basic_auth) + expect(auth[:user]).not_to be_empty + expect(auth[:password]).not_to be_empty + end + end + + context "with only username" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'username' => 'test_user' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:build_basic_auth) }.to raise_error(LogStash::ConfigurationError, /`username` requires `password`/) + end + end + + context "with only password" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'password' => 'test_&^%$!password' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:build_basic_auth) }.to raise_error(LogStash::ConfigurationError, /`password` is not allowed unless `username` is specified/) + end + end + + context "with empty username" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'username' => '', + 'password' => 'test_&^%$!password' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:build_basic_auth) }.to raise_error(LogStash::ConfigurationError, /Empty `username` or `password` is not allowed/) + end + end + + context "with empty password" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'username' => 'test_user', + 'password' => '' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:build_basic_auth) }.to raise_error(LogStash::ConfigurationError, /Empty `username` or `password` is not allowed/) + end + end + + context "with neither username nor password" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc' + } + end + + it "returns empty hash" do + auth = subject.send(:build_basic_auth) + expect(auth).to be_empty + end + end + + context "with unsecure connection and credentials" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'username' => 'test_user', + 'password' => 'test_&^%$!password' + } + end + + it "still returns valid auth hash" do + allow(subject.logger).to receive(:warn) + auth = subject.send(:build_basic_auth) + expect(auth).to eq({:user => 'test_user', :password => 'test_&^%$!password'}) + end + end + end + + context "secured connection against schema registry" do + + before do + allow(File).to receive(:readable?).and_return(true) + allow(File).to receive(:writable?).and_return(false) + end + + context "explicit and inferred SSL" do + context "with explicit ssl_enabled => true" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'ssl_enabled' => true + } + end + + it "enables SSL" do + expect(subject.instance_variable_get(:@ssl_enabled)).to be true + end + end + + context "with HTTPS URI (inferred ssl_enabled)" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc' + } + end + + it "automatically enables SSL for HTTPS URIs" do + subject.send(:validate_ssl_settings!) + expect(subject.instance_variable_get(:@ssl_enabled)).to be true + end + end + + context "with explicit ssl_enabled => false" do + let(:avro_config) do + { + 'schema_uri' => 'http://schema-registry.example.com/schema.avsc', + 'ssl_enabled' => false + } + end + + it "disables SSL" do + expect(subject.instance_variable_get(:@ssl_enabled)).to be false + end + end + + context "with explicit ssl_enabled => true and HTTPS URI" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_enabled' => false + } + end + + it "requires ssl_enabled => true" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /Secured https:\/\/schema-registry.example.com\/schema.avsc connection requires `ssl_enabled => true`. / + ) + end + end + end + + context "SSL verification" do + context "with ssl_verification_mode => 'full'" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_verification_mode' => 'full' + } + end + + it "sets verification mode to full" do + expect(subject.instance_variable_get(:@ssl_verification_mode)).to eq('full') + end + + it "builds SSL options with default verify mode" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:verify]).to eq(:default) + end + end + + context "with ssl_verification_mode => 'none'" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_verification_mode' => 'none' + } + end + + it "sets verification mode to none" do + expect(subject.instance_variable_get(:@ssl_verification_mode)).to eq('none') + end + + it "builds SSL options with disable verify mode" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:verify]).to eq(:disable) + end + end + + context "with default ssl_verification_mode" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc' + } + end + + it "defaults to 'full'" do + subject.send(:validate_ssl_settings!) + expect(subject.instance_variable_get(:@ssl_verification_mode)).to eq('full') + end + end + end + + context "keystore configuration" do + context "with ssl_keystore_path" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_keystore_path' => paths[:test_path], + 'ssl_keystore_password' => 'keystore_pass', + 'ssl_keystore_type' => 'jks' + } + end + + it "configures keystore options" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:keystore]).to eq(paths[:test_path]) + expect(ssl_options[:keystore_password]).to eq('keystore_pass') + expect(ssl_options[:keystore_type]).to eq('jks') + end + end + + context "with ssl_keystore_path and pkcs12 type" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_keystore_path' => paths[:test_path], + 'ssl_keystore_password' => 'keystore_pass', + 'ssl_keystore_type' => 'pkcs12' + } + end + + it "configures pkcs12 keystore" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:keystore_type]).to eq('pkcs12') + end + end + end + + context "truststore configuration" do + context "with ssl_truststore_path" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_truststore_path' => paths[:test_path], + 'ssl_truststore_password' => 'truststore_pass', + 'ssl_truststore_type' => 'jks' + } + end + + it "configures truststore options" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:truststore]).to eq(paths[:test_path]) + expect(ssl_options[:truststore_password]).to eq('truststore_pass') + expect(ssl_options[:truststore_type]).to eq('jks') + end + end + + context "with ssl_truststore_path and pkcs12 type" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_truststore_path' => paths[:test_path], + 'ssl_truststore_password' => 'truststore_pass', + 'ssl_truststore_type' => 'pkcs12' + } + end + + it "configures pkcs12 truststore" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:truststore_type]).to eq('pkcs12') + end + end + end + + context "CA configuration" do + + context "with single CA file" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_certificate_authorities' => [paths[:test_path]] + } + end + + it "configures CA certificate" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:ca_file]).to eq(paths[:test_path]) + end + end + + context "with multiple CA files" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_certificate_authorities' => [paths[:test_path], paths[:test_path]] + } + end + + it "raises ConfigurationError for multiple CAs" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /Multiple values on `ssl_certificate_authorities` are not supported/ + ) + end + end + + context "with empty ssl_certificate_authorities" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_certificate_authorities' => [] + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_certificate_authorities` cannot be empty/ + ) + end + end + end + + context "cipher suites" do + context "with specified cipher suites" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_cipher_suites' => %w[TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256] + } + end + + it "configures cipher suites" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:cipher_suites]).to eq(%w[TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]) + end + end + + context "without cipher suites" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc' + } + end + + it "does not include cipher_suites in SSL options" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options).not_to have_key(:cipher_suites) + end + end + end + + context "supported protocols" do + context "with TLSv1.2 and TLSv1.3" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_supported_protocols' => %w[TLSv1.2 TLSv1.3] + } + end + + it "configures supported protocols" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:protocols]).to eq(%w[TLSv1.2 TLSv1.3]) + end + end + + context "with only TLSv1.3" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_supported_protocols' => ['TLSv1.3'] + } + end + + it "configures only TLSv1.3" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options[:protocols]).to eq(['TLSv1.3']) + end + end + + context "with empty ssl_supported_protocols" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_supported_protocols' => [] + } + end + + it "does not include protocols in SSL options" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options).not_to have_key(:protocols) + end + end + + context "without ssl_supported_protocols" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc' + } + end + + it "uses default (no protocols key)" do + ssl_options = subject.send(:build_ssl_options) + expect(ssl_options).not_to have_key(:protocols) + end + end + end + + context "SSL validations" do + context "PEM certificate validation" do + context "when both ssl_certificate and ssl_keystore_path are set" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_certificate' => paths[:test_path], + 'ssl_key' => paths[:test_path], + 'ssl_keystore_path' => paths[:test_path], + 'ssl_keystore_password' => 'password' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_certificate` and `ssl_keystore_path` cannot be used together/ + ) + end + end + + context "when ssl_certificate is set without ssl_key" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_certificate' => paths[:test_path] + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_certificate` requires `ssl_key`/ + ) + end + end + + context "when ssl_key is set without ssl_certificate" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_key' => paths[:test_path] + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_key` is not allowed unless `ssl_certificate` is specified/ + ) + end + end + end + + context "keystore validation" do + context "when ssl_keystore_password is set without ssl_keystore_path" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_keystore_password' => 'password' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_keystore_password` is not allowed unless `ssl_keystore_path` is specified/ + ) + end + end + + context "when ssl_keystore_password is empty" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_keystore_path' => paths[:test_path], + 'ssl_keystore_password' => '' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_keystore_password` cannot be empty/ + ) + end + end + + context "when ssl_keystore_type is set without ssl_keystore_path" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_keystore_type' => 'jks' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_keystore_type` is not allowed unless `ssl_keystore_path` is specified/ + ) + end + end + end + + context "truststore validation" do + context "when ssl_truststore_password is set without ssl_truststore_path" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_truststore_password' => 'password' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_truststore_password` is not allowed unless `ssl_truststore_path` is specified/ + ) + end + end + + context "when ssl_truststore_password is empty" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_truststore_path' => paths[:test_path], + 'ssl_truststore_password' => '' + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_truststore_password` cannot be empty/ + ) + end + end + + context "when both ssl_truststore_path and ssl_certificate_authorities are set" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_truststore_path' => paths[:test_path], + 'ssl_truststore_password' => 'password', + 'ssl_certificate_authorities' => [paths[:test_path]] + } + end + + it "raises ConfigurationError" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_truststore_path` and `ssl_certificate_authorities` cannot be used together/ + ) + end + end + end + + context "verification mode validation" do + context "when ssl_truststore_path is set with verification mode none" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_truststore_path' => paths[:test_path], + 'ssl_truststore_password' => 'password', + 'ssl_verification_mode' => 'none' + } + end + + it "requires `ssl_verification_mode` => 'full'" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_truststore_path` requires `ssl_verification_mode` to be `full`/ + ) + end + end + + context "when ssl_truststore_password is set with verification mode none" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_truststore_password' => 'password', + 'ssl_verification_mode' => 'none' + } + end + + it "requires `ssl_verification_mode => 'full'" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_truststore_password` requires `ssl_truststore_path` and `ssl_verification_mode => 'full'`/ + ) + end + end + + context "when ssl_certificate_authorities is set with verification mode none" do + let(:avro_config) do + { + 'schema_uri' => 'https://schema-registry.example.com/schema.avsc', + 'ssl_certificate_authorities' => [paths[:test_path]], + 'ssl_verification_mode' => 'none' + } + end + + it "requires `ssl_verification_mode => 'full'" do + expect { subject.send(:validate_ssl_settings!) }.to raise_error( + LogStash::ConfigurationError, + /`ssl_certificate_authorities` requires `ssl_verification_mode` to be `full`/ + ) + end + end + end + end + end + end +end diff --git a/spec/unit/resources/do_not_remove_path/.gitignore b/spec/unit/resources/do_not_remove_path/.gitignore new file mode 100644 index 0000000..79bf7a1 --- /dev/null +++ b/spec/unit/resources/do_not_remove_path/.gitignore @@ -0,0 +1,2 @@ +# Empty dir for test cases to run, imitates readable/non-readable/writable folder +# When configs are :path validated, existed path is required \ No newline at end of file