diff --git a/exporter/otlp-http/lib/opentelemetry/exporter/otlp/http/trace_exporter.rb b/exporter/otlp-http/lib/opentelemetry/exporter/otlp/http/trace_exporter.rb index 74c92a4f4c..55e862402c 100644 --- a/exporter/otlp-http/lib/opentelemetry/exporter/otlp/http/trace_exporter.rb +++ b/exporter/otlp-http/lib/opentelemetry/exporter/otlp/http/trace_exporter.rb @@ -25,7 +25,8 @@ class TraceExporter # rubocop:disable Metrics/ClassLength # Default timeouts in seconds. KEEP_ALIVE_TIMEOUT = 30 RETRY_COUNT = 5 - private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT) + RESPONSE_BODY_LIMIT = 4_194_304 # 4 MB + private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT, :RESPONSE_BODY_LIMIT) ERROR_MESSAGE_INVALID_HEADERS = 'headers must be a String with comma-separated URL Encoded UTF-8 k=v pairs or a Hash' private_constant(:ERROR_MESSAGE_INVALID_HEADERS) @@ -158,18 +159,19 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/MethodLength case response when Net::HTTPSuccess - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory SUCCESS when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1, reason: response.code) FAILURE when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory redo if backoff?(retry_count: retry_count += 1, reason: response.code) FAILURE when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError - log_status(response.body) + body = read_response_body(response) + log_status(body) @metrics_reporter.add_to_counter('otel.otlp_exporter.failure', labels: { 'reason' => response.code }) FAILURE when Net::HTTPRedirection @@ -216,14 +218,42 @@ def handle_redirect(location) end def log_status(body) + truncation_note = @body_truncated ? ' (body truncated due to size limit)' : '' status = Google::Rpc::Status.decode(body) details = status.details.map do |detail| klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass detail.unpack(klass_or_nil) if klass_or_nil end.compact - OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}}") + OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}}#{truncation_note}") rescue StandardError => e - OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::Exporter#log_status') + OpenTelemetry.handle_error(exception: e, message: "unexpected error decoding rpc.Status in OTLP::Exporter#log_status#{truncation_note}") + ensure + @body_truncated = false + end + + def read_response_body(response) + return '' if response.nil? + + body = +'' + truncated = false + + response.read_body do |chunk| + if body.bytesize + chunk.bytesize <= RESPONSE_BODY_LIMIT + body << chunk + else + remaining = RESPONSE_BODY_LIMIT - body.bytesize + body << chunk.byteslice(0, remaining) if remaining > 0 + truncated = true + break + end + end + + body.force_encoding('UTF-8') + @body_truncated = truncated + body + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'error reading response body') + '' end def measure_request_duration diff --git a/exporter/otlp-http/test/opentelemetry/exporter/otlp/http/trace_exporter_test.rb b/exporter/otlp-http/test/opentelemetry/exporter/otlp/http/trace_exporter_test.rb index e5d3ef2160..c7dfe4826e 100644 --- a/exporter/otlp-http/test/opentelemetry/exporter/otlp/http/trace_exporter_test.rb +++ b/exporter/otlp-http/test/opentelemetry/exporter/otlp/http/trace_exporter_test.rb @@ -742,4 +742,81 @@ end end end + + describe 'response body reading' do + let(:exporter) { OpenTelemetry::Exporter::OTLP::HTTP::TraceExporter.new } + let(:span_data) { OpenTelemetry::TestHelpers.create_span_data } + + it 'discards body for successful responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 200, body: 'success body') + + result = exporter.export([span_data]) + + _(result).must_equal(success) + end + + it 'discards body for retryable responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/traces') + .to_return(status: 503, body: 'service unavailable', headers: { 'Retry-After' => '0' }) + .then.to_return(status: 200) + + result = exporter.export([span_data]) + + _(result).must_equal(success) + end + + it 'reads and parses error response body smaller than limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: 'error details'))] + status = ::Google::Rpc::Status.encode(::Google::Rpc::Status.new(code: 3, message: 'invalid argument', details: details)) + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: status) + + result = exporter.export([span_data]) + + _(result).must_equal(export_failure) + _(log_stream.string).must_match(/invalid argument/) + _(log_stream.string).wont_match(/truncated/) + ensure + OpenTelemetry.logger = logger + end + + it 'truncates error response body larger than 4 MB limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + # Create a body larger than 4 MB + large_message = 'x' * 5_000_000 # 5 MB + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: large_message))] + large_status = ::Google::Rpc::Status.new(code: 3, message: 'large error', details: details) + large_body = ::Google::Rpc::Status.encode(large_status) + + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: large_body) + + result = exporter.export([span_data]) + + _(result).must_equal(export_failure) + _(log_stream.string).must_match(/body truncated due to size limit/) + ensure + OpenTelemetry.logger = logger + end + + it 'handles malformed error response body gracefully' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: 'not valid protobuf') + + result = exporter.export([span_data]) + + _(result).must_equal(export_failure) + _(log_stream.string).must_match(/unexpected error decoding rpc.Status/) + ensure + OpenTelemetry.logger = logger + end + end end diff --git a/exporter/otlp-logs/lib/opentelemetry/exporter/otlp/logs/logs_exporter.rb b/exporter/otlp-logs/lib/opentelemetry/exporter/otlp/logs/logs_exporter.rb index 91f5d7e4db..1d32540516 100644 --- a/exporter/otlp-logs/lib/opentelemetry/exporter/otlp/logs/logs_exporter.rb +++ b/exporter/otlp-logs/lib/opentelemetry/exporter/otlp/logs/logs_exporter.rb @@ -31,7 +31,8 @@ class LogsExporter # rubocop:disable Metrics/ClassLength # Default timeouts in seconds. KEEP_ALIVE_TIMEOUT = 30 RETRY_COUNT = 5 - private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT) + RESPONSE_BODY_LIMIT = 4_194_304 # 4 MB + private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT, :RESPONSE_BODY_LIMIT) ERROR_MESSAGE_INVALID_HEADERS = 'headers must be a String with comma-separated URL Encoded UTF-8 k=v pairs or a Hash' private_constant(:ERROR_MESSAGE_INVALID_HEADERS) @@ -167,15 +168,15 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity, case response when Net::HTTPSuccess - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory SUCCESS when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory handle_http_error(response) redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1) FAILURE when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory handle_http_error(response) redo if backoff?(retry_count: retry_count += 1) FAILURE @@ -183,7 +184,8 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity, handle_http_error(response) FAILURE when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError - log_status(response.body) + body = read_response_body(response) + log_status(body) FAILURE when Net::HTTPRedirection @http.finish @@ -234,14 +236,42 @@ def handle_redirect(location) end def log_status(body) + truncation_note = @body_truncated ? ' (body truncated due to size limit)' : '' status = Google::Rpc::Status.decode(body) details = status.details.map do |detail| klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass detail.unpack(klass_or_nil) if klass_or_nil end.compact - OpenTelemetry.handle_error(message: "OTLP logs exporter received rpc.Status{message=#{status.message}, details=#{details}}") + OpenTelemetry.handle_error(message: "OTLP logs exporter received rpc.Status{message=#{status.message}, details=#{details}}#{truncation_note}") rescue StandardError => e - OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::Exporter#log_status') + OpenTelemetry.handle_error(exception: e, message: "unexpected error decoding rpc.Status in OTLP::Exporter#log_status#{truncation_note}") + ensure + @body_truncated = false + end + + def read_response_body(response) + return '' if response.nil? + + body = +'' + truncated = false + + response.read_body do |chunk| + if body.bytesize + chunk.bytesize <= RESPONSE_BODY_LIMIT + body << chunk + else + remaining = RESPONSE_BODY_LIMIT - body.bytesize + body << chunk.byteslice(0, remaining) if remaining > 0 + truncated = true + break + end + end + + body.force_encoding('UTF-8') + @body_truncated = truncated + body + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'error reading response body') + '' end def backoff?(retry_count:, retry_after: nil) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity diff --git a/exporter/otlp-logs/test/opentelemetry/exporter/otlp/logs_exporter_test.rb b/exporter/otlp-logs/test/opentelemetry/exporter/otlp/logs_exporter_test.rb index d4dccd34ea..eaf012bba9 100644 --- a/exporter/otlp-logs/test/opentelemetry/exporter/otlp/logs_exporter_test.rb +++ b/exporter/otlp-logs/test/opentelemetry/exporter/otlp/logs_exporter_test.rb @@ -955,4 +955,81 @@ end end end + + describe 'response body reading' do + let(:exporter) { OpenTelemetry::Exporter::OTLP::Logs::LogsExporter.new } + let(:log_record_data) { OpenTelemetry::TestHelpers.create_log_record_data } + + it 'discards body for successful responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 200, body: 'success body') + + result = exporter.export([log_record_data]) + + _(result).must_equal(SUCCESS) + end + + it 'discards body for retryable responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/logs') + .to_return(status: 503, body: 'service unavailable', headers: { 'Retry-After' => '0' }) + .then.to_return(status: 200) + + result = exporter.export([log_record_data]) + + _(result).must_equal(SUCCESS) + end + + it 'reads and parses error response body smaller than limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: 'error details'))] + status = ::Google::Rpc::Status.encode(::Google::Rpc::Status.new(code: 3, message: 'invalid argument', details: details)) + stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 400, body: status) + + result = exporter.export([log_record_data]) + + _(result).must_equal(FAILURE) + _(log_stream.string).must_match(/invalid argument/) + _(log_stream.string).wont_match(/truncated/) + ensure + OpenTelemetry.logger = logger + end + + it 'truncates error response body larger than 4 MB limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + # Create a body larger than 4 MB + large_message = 'x' * 5_000_000 # 5 MB + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: large_message))] + large_status = ::Google::Rpc::Status.new(code: 3, message: 'large error', details: details) + large_body = ::Google::Rpc::Status.encode(large_status) + + stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 400, body: large_body) + + result = exporter.export([log_record_data]) + + _(result).must_equal(FAILURE) + _(log_stream.string).must_match(/body truncated due to size limit/) + ensure + OpenTelemetry.logger = logger + end + + it 'handles malformed error response body gracefully' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 400, body: 'not valid protobuf') + + result = exporter.export([log_record_data]) + + _(result).must_equal(FAILURE) + _(log_stream.string).must_match(/unexpected error decoding rpc.Status/) + ensure + OpenTelemetry.logger = logger + end + end end diff --git a/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb b/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb index 9157e1fdfe..d032762d82 100644 --- a/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb +++ b/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb @@ -120,15 +120,15 @@ def send_bytes(bytes, timeout:) response = @http.request(request) case response when Net::HTTPSuccess - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory SUCCESS when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1, reason: response.code) OpenTelemetry.logger.warn('Net::HTTPServiceUnavailable/Net::HTTPTooManyRequests in MetricsExporter#send_bytes') FAILURE when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway - response.body # Read and discard body + response.read_body(nil) # Discard without reading into memory redo if backoff?(retry_count: retry_count += 1, reason: response.code) OpenTelemetry.logger.warn('Net::HTTPRequestTimeOut/Net::HTTPGatewayTimeOut/Net::HTTPBadGateway in MetricsExporter#send_bytes') FAILURE @@ -136,7 +136,8 @@ def send_bytes(bytes, timeout:) OpenTelemetry.handle_error(message: "OTLP metrics_exporter received http.code=404 for uri: '#{@path}'") FAILURE when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError - log_status(response.body) + body = read_response_body(response) + log_status(body) OpenTelemetry.logger.warn('Net::HTTPBadRequest/Net::HTTPClientError/Net::HTTPServerError in MetricsExporter#send_bytes') FAILURE when Net::HTTPRedirection diff --git a/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/util.rb b/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/util.rb index efcbc6ff87..f4a7a51a62 100644 --- a/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/util.rb +++ b/exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/util.rb @@ -12,8 +12,10 @@ module Metrics module Util # rubocop:disable Metrics/ModuleLength KEEP_ALIVE_TIMEOUT = 30 RETRY_COUNT = 5 + RESPONSE_BODY_LIMIT = 4_194_304 # 4 MB ERROR_MESSAGE_INVALID_HEADERS = 'headers must be a String with comma-separated URL Encoded UTF-8 k=v pairs or a Hash' DEFAULT_USER_AGENT = "OTel-OTLP-MetricsExporter-Ruby/#{OpenTelemetry::Exporter::OTLP::Metrics::VERSION} Ruby/#{RUBY_VERSION} (#{RUBY_PLATFORM}; #{RUBY_ENGINE}/#{RUBY_ENGINE_VERSION})".freeze + private_constant(:RESPONSE_BODY_LIMIT) def http_connection(uri, ssl_verify_mode, certificate_file, client_certificate_file, client_key_file) http = Net::HTTP.new(uri.hostname, uri.port) @@ -115,15 +117,44 @@ def backoff?(retry_count:, reason:, retry_after: nil) end def log_status(body) + truncation_note = @body_truncated ? ' (body truncated due to size limit)' : '' status = Google::Rpc::Status.decode(body) details = status.details.map do |detail| type_name = detail.type_url.to_s.split('/').last.to_s klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(type_name)&.msgclass detail.unpack(klass_or_nil) if klass_or_nil end.compact - OpenTelemetry.handle_error(message: "OTLP metrics_exporter received rpc.Status{message=#{status.message}, details=#{details}}") + OpenTelemetry.handle_error(message: "OTLP metrics_exporter received rpc.Status{message=#{status.message}, details=#{details}}#{truncation_note}") rescue StandardError => e - OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::MetricsExporter#log_status') + OpenTelemetry.handle_error(exception: e, message: "unexpected error decoding rpc.Status in OTLP::MetricsExporter#log_status#{truncation_note}") + ensure + @body_truncated = false + end + + def read_response_body(response) + return '' if response.nil? + + # Stream read with 4 MB limit + body = +'' + truncated = false + + response.read_body do |chunk| + if body.bytesize + chunk.bytesize <= RESPONSE_BODY_LIMIT + body << chunk + else + remaining = RESPONSE_BODY_LIMIT - body.bytesize + body << chunk.byteslice(0, remaining) if remaining > 0 + truncated = true + break + end + end + + body.force_encoding('UTF-8') + @body_truncated = truncated + body + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'error reading response body') + '' end def handle_redirect(location); end diff --git a/exporter/otlp-metrics/test/opentelemetry/exporter/otlp/metrics/metrics_exporter_test.rb b/exporter/otlp-metrics/test/opentelemetry/exporter/otlp/metrics/metrics_exporter_test.rb index 7debfd74c3..9eecbc12ce 100644 --- a/exporter/otlp-metrics/test/opentelemetry/exporter/otlp/metrics/metrics_exporter_test.rb +++ b/exporter/otlp-metrics/test/opentelemetry/exporter/otlp/metrics/metrics_exporter_test.rb @@ -913,4 +913,81 @@ end end end + + describe 'response body reading' do + let(:exporter) { OpenTelemetry::Exporter::OTLP::Metrics::MetricsExporter.new } + let(:metric_data) { create_metrics_data } + + it 'discards body for successful responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/metrics').to_return(status: 200, body: 'success body') + + result = exporter.export([metric_data]) + + _(result).must_equal(METRICS_SUCCESS) + end + + it 'discards body for retryable responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/metrics') + .to_return(status: 503, body: 'service unavailable', headers: { 'Retry-After' => '0' }) + .then.to_return(status: 200) + + result = exporter.export([metric_data]) + + _(result).must_equal(METRICS_SUCCESS) + end + + it 'reads and parses error response body smaller than limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: 'error details'))] + status = ::Google::Rpc::Status.encode(::Google::Rpc::Status.new(code: 3, message: 'invalid argument', details: details)) + stub_request(:post, 'http://localhost:4318/v1/metrics').to_return(status: 400, body: status) + + result = exporter.export([metric_data]) + + _(result).must_equal(METRICS_FAILURE) + _(log_stream.string).must_match(/invalid argument/) + _(log_stream.string).wont_match(/truncated/) + ensure + OpenTelemetry.logger = logger + end + + it 'truncates error response body larger than 4 MB limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + # Create a body larger than 4 MB + large_message = 'x' * 5_000_000 # 5 MB + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: large_message))] + large_status = ::Google::Rpc::Status.new(code: 3, message: 'large error', details: details) + large_body = ::Google::Rpc::Status.encode(large_status) + + stub_request(:post, 'http://localhost:4318/v1/metrics').to_return(status: 400, body: large_body) + + result = exporter.export([metric_data]) + + _(result).must_equal(METRICS_FAILURE) + _(log_stream.string).must_match(/body truncated due to size limit/) + ensure + OpenTelemetry.logger = logger + end + + it 'handles malformed error response body gracefully' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + stub_request(:post, 'http://localhost:4318/v1/metrics').to_return(status: 400, body: 'not valid protobuf') + + result = exporter.export([metric_data]) + + _(result).must_equal(METRICS_FAILURE) + _(log_stream.string).must_match(/unexpected error decoding rpc.Status/) + ensure + OpenTelemetry.logger = logger + end + end end diff --git a/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb b/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb index 9647177f61..737cdbc656 100644 --- a/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb +++ b/exporter/otlp/lib/opentelemetry/exporter/otlp/exporter.rb @@ -28,7 +28,8 @@ class Exporter # rubocop:disable Metrics/ClassLength # Default timeouts in seconds. KEEP_ALIVE_TIMEOUT = 30 RETRY_COUNT = 5 - private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT) + RESPONSE_BODY_LIMIT = 4_194_304 # 4 MB + private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT, :RESPONSE_BODY_LIMIT) ERROR_MESSAGE_INVALID_HEADERS = 'headers must be a String with comma-separated URL Encoded UTF-8 k=v pairs or a Hash' private_constant(:ERROR_MESSAGE_INVALID_HEADERS) @@ -174,36 +175,49 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity, @http.read_timeout = remaining_timeout @http.write_timeout = remaining_timeout @http.start unless @http.started? - response = measure_request_duration { @http.request(request) } - - case response - when Net::HTTPSuccess - response.body # Read and discard body - SUCCESS - when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests - response.body # Read and discard body - redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1, reason: response.code) - FAILURE - when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway - response.body # Read and discard body - redo if backoff?(retry_count: retry_count += 1, reason: response.code) - FAILURE - when Net::HTTPNotFound - log_request_failure(response.code) - FAILURE - when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError - log_status(response.body) - @metrics_reporter.add_to_counter('otel.otlp_exporter.failure', labels: { 'reason' => response.code }) - FAILURE - when Net::HTTPRedirection - @http.finish - handle_redirect(response['location']) - redo if backoff?(retry_after: 0, retry_count: retry_count += 1, reason: response.code) - else - @http.finish - log_request_failure(response.code) - FAILURE + result = nil + should_redo = false + + measure_request_duration do + @http.request(request) do |response| + case response + when Net::HTTPSuccess + response.read_body { |_| } # Drain and discard, preserves keep-alive + result = SUCCESS + when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests + response.read_body { |_| } + should_redo = backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1, reason: response.code) + result = FAILURE + when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway + response.read_body { |_| } + should_redo = backoff?(retry_count: retry_count += 1, reason: response.code) + result = FAILURE + when Net::HTTPNotFound + response.read_body { |_| } + log_request_failure(response.code) + result = FAILURE + when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError + body, truncated = read_response_body(response) + log_status(body, truncated: truncated) + @metrics_reporter.add_to_counter('otel.otlp_exporter.failure', labels: { 'reason' => response.code }) + result = FAILURE + when Net::HTTPRedirection + response.read_body { |_| } + @http.finish + handle_redirect(response['location']) + should_redo = backoff?(retry_after: 0, retry_count: retry_count += 1, reason: response.code) + else + response.read_body { |_| } + @http.finish + log_request_failure(response.code) + result = FAILURE + end + end end + + redo if should_redo + + result rescue Net::OpenTimeout, Net::ReadTimeout retry if backoff?(retry_count: retry_count += 1, reason: 'timeout') return FAILURE @@ -239,7 +253,12 @@ def handle_redirect(location) # TODO: figure out destination and reinitialize @http and @path end - def log_status(body) + def log_status(body, truncated: false) + if truncated + OpenTelemetry.handle_error(message: "OTLP exporter received an oversized error response body (truncated at #{RESPONSE_BODY_LIMIT} bytes) for uri=#{@uri}") + return + end + status = Google::Rpc::Status.decode(body) details = status.details.map do |detail| klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass @@ -250,6 +269,39 @@ def log_status(body) OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::Exporter#log_status') end + def read_response_body(response) + return ['', false] if response.nil? + + body, truncated = collect_response_chunks(response) + body.force_encoding('UTF-8') + [body, truncated] + rescue StandardError => e + OpenTelemetry.handle_error(exception: e, message: 'error reading response body') + ['', false] + end + + def collect_response_chunks(response) + body = +'' + truncated = false + + response.read_body do |chunk| + remaining = RESPONSE_BODY_LIMIT - body.bytesize + body << chunk.byteslice(0, remaining) + + if chunk.bytesize > remaining + truncated = true + @http.finish # closes socket, nil's the body or else net/http will attempt to read the rest of the response + break + end + end + + [body, truncated] + rescue IOError + raise unless truncated # we'll handle this when we know net/http is upset trying to read after http.finish + + [body || '', truncated] + end + def log_request_failure(response_code) OpenTelemetry.handle_error(message: "OTLP exporter received http.code=#{response_code} for uri='#{@uri}' in OTLP::Exporter#send_bytes") @metrics_reporter.add_to_counter('otel.otlp_exporter.failure', labels: { 'reason' => response_code }) diff --git a/exporter/otlp/test/opentelemetry/exporter/otlp/exporter_test.rb b/exporter/otlp/test/opentelemetry/exporter/otlp/exporter_test.rb index e5a1dc5e0a..7be054c30e 100644 --- a/exporter/otlp/test/opentelemetry/exporter/otlp/exporter_test.rb +++ b/exporter/otlp/test/opentelemetry/exporter/otlp/exporter_test.rb @@ -1041,4 +1041,237 @@ def create_link(span_context) OpenTelemetry::Trace::Link.new(span_context, { 'link-attribute' => 'link-value' }) end end + + describe 'response body reading with real sockets' do + # These tests use a real TCPServer instead of WebMock to verify + # behavior against actual Net::HTTP socket I/O. WebMock's patching + # of Net::HTTP changes how read_body works — even with + # allow_net_connect!, responses go through WebMock's adapter which + # doesn't exercise the same code paths as real Net::HTTP. + + def with_fake_server(response_body_size:, status: 400) + server = TCPServer.new('127.0.0.1', 0) + port = server.addr[1] + body = 'X' * response_body_size + + server_thread = Thread.new { handle_fake_request(server, body, status) } + + # Fully disable WebMock's Net::HTTP adapter so we get real + # socket behavior, not WebMock's patched read_body. + WebMock::HttpLibAdapters::NetHttpAdapter.disable! + yield port + ensure + WebMock::HttpLibAdapters::NetHttpAdapter.enable! + server&.close + server_thread&.join(2) + end + + def handle_fake_request(server, body, status) + client = server.accept + content_length = read_content_length(client) + client.read(content_length) if content_length > 0 + + client.print "HTTP/1.1 #{status} Bad Request\r\n" + client.print "Content-Type: application/octet-stream\r\n" + client.print "Content-Length: #{body.bytesize}\r\n" + client.print "Connection: close\r\n" + client.print "\r\n" + client.write body + client.close + rescue StandardError + # client may disconnect early + end + + def read_content_length(client) + content_length = 0 + while (line = client.gets) && line != "\r\n" + content_length = line.split(': ', 2).last.to_i if line.start_with?('Content-Length') + end + content_length + end + + it 'limits error response body read to 4 MB against a real HTTP server' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + with_fake_server(response_body_size: 5_000_000) do |port| + exporter = OpenTelemetry::Exporter::OTLP::Exporter.new( + endpoint: "http://127.0.0.1:#{port}/v1/traces" + ) + span_data = OpenTelemetry::TestHelpers.create_span_data + result = exporter.export([span_data]) + + _(result).must_equal(FAILURE) + # The chunked reader should cap at 4 MB and log a clear oversized message. + _(log_stream.string).must_match(/oversized error response body/) + _(log_stream.string).wont_match(/unexpected error decoding/) + # And it should work without hitting "read_body called twice", + # which would mean the body was already fully read into memory. + _(log_stream.string).wont_match(/read_body called twice/) + end + ensure + OpenTelemetry.logger = logger + end + + it 'does not buffer beyond the limit internally' do + captured_body_size = nil + internal_body = :not_checked + + spy = Module.new do + define_method(:read_response_body) do |response| + super(response).tap do |result_body, _truncated| + captured_body_size = result_body.bytesize + internal_body = response.body + end + end + end + + limit = OpenTelemetry::Exporter::OTLP::Exporter.const_get(:RESPONSE_BODY_LIMIT) + + with_fake_server(response_body_size: 10_000_000) do |port| + exporter = OpenTelemetry::Exporter::OTLP::Exporter.new( + endpoint: "http://127.0.0.1:#{port}/v1/traces" + ) + exporter.singleton_class.prepend(spy) + + OpenTelemetry.logger = ::Logger.new(File::NULL) + exporter.export([OpenTelemetry::TestHelpers.create_span_data]) + + # read_response_body must cap the returned body at the limit + _(captured_body_size).wont_be_nil + _(captured_body_size).must_be :<=, limit + + # Net::HTTP must NOT have the full 10 MB response buffered internally. + # After @http.finish closes the socket mid-read, response.body is nil + # (block-form read_body doesn't accumulate into @body). + if internal_body.is_a?(String) + _(internal_body.bytesize).must_be :<=, limit, + "Net::HTTP buffered #{internal_body.bytesize} bytes internally, exceeding the #{limit} byte limit" + end + end + end + end + + describe 'response body reading' do + let(:exporter) { OpenTelemetry::Exporter::OTLP::Exporter.new } + let(:span_data) { OpenTelemetry::TestHelpers.create_span_data } + + it 'discards body for successful responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 200, body: 'success body') + + result = exporter.export([span_data]) + + _(result).must_equal(SUCCESS) + end + + it 'discards body for retryable responses without reading into memory' do + stub_request(:post, 'http://localhost:4318/v1/traces') + .to_return(status: 503, body: 'service unavailable', headers: { 'Retry-After' => '0' }) + .then.to_return(status: 200) + + result = exporter.export([span_data]) + + _(result).must_equal(SUCCESS) + end + + it 'reads and parses error response body smaller than limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: 'error details'))] + status = ::Google::Rpc::Status.encode(::Google::Rpc::Status.new(code: 3, message: 'invalid argument', details: details)) + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: status) + + result = exporter.export([span_data]) + + _(result).must_equal(FAILURE) + _(log_stream.string).must_match(/invalid argument/) + _(log_stream.string).wont_match(/truncated/) + ensure + OpenTelemetry.logger = logger + end + + it 'truncates error response body larger than 4 MB limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + # Create a body larger than 4 MB + large_message = 'x' * 5_000_000 # 5 MB + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: large_message))] + large_status = ::Google::Rpc::Status.new(code: 3, message: 'large error', details: details) + large_body = ::Google::Rpc::Status.encode(large_status) + + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: large_body) + + result = exporter.export([span_data]) + + _(result).must_equal(FAILURE) + _(log_stream.string).must_match(/oversized error response body/) + _(log_stream.string).wont_match(/unexpected error decoding/) + ensure + OpenTelemetry.logger = logger + end + + it 'handles error response body at exactly 4 MB limit' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + # Create a body at exactly 4 MB + exact_size_message = 'y' * (4_194_304 - 100) # Account for protobuf overhead + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: exact_size_message))] + exact_status = ::Google::Rpc::Status.new(code: 3, message: 'exact size', details: details) + exact_body = ::Google::Rpc::Status.encode(exact_status) + + # Skip if encoded body is still larger than 4 MB due to protobuf overhead + skip 'Protobuf overhead makes this test impractical' if exact_body.bytesize > 4_194_304 + + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: exact_body) + + result = exporter.export([span_data]) + + _(result).must_equal(FAILURE) + ensure + OpenTelemetry.logger = logger + end + + it 'handles malformed error response body gracefully' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: 'not valid protobuf') + + result = exporter.export([span_data]) + + _(result).must_equal(FAILURE) + _(log_stream.string).must_match(/unexpected error decoding rpc.Status/) + ensure + OpenTelemetry.logger = logger + end + + it 'handles truncated protobuf in error response' do + log_stream = StringIO.new + logger = OpenTelemetry.logger + OpenTelemetry.logger = ::Logger.new(log_stream) + + # Create a large protobuf that will be truncated, making it invalid + large_message = 'z' * 5_000_000 + details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: large_message))] + large_status = ::Google::Rpc::Status.new(code: 3, message: 'truncation test', details: details) + large_body = ::Google::Rpc::Status.encode(large_status) + + stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: large_body) + + result = exporter.export([span_data]) + + _(result).must_equal(FAILURE) + _(log_stream.string).must_match(/oversized error response body/) + ensure + OpenTelemetry.logger = logger + end + end end