From 38a864bce3d683d462d9524f3c2abe488baf8a2b Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Thu, 19 Mar 2026 14:02:12 -0400 Subject: [PATCH] oboe_sampler refactor and use pq for transaction name expire management --- .rubocop.yml | 2 + .../opentelemetry/otlp_processor.rb | 7 +- lib/solarwinds_apm/otel_config.rb | 10 +- lib/solarwinds_apm/sampling/oboe_sampler.rb | 125 +++++++------ .../support/txn_name_manager.rb | 125 ++++++++++--- test/api/set_transaction_name_test.rb | 1 + test/opentelemetry/otlp_processor_test.rb | 15 +- .../otel_config_log_bridge_test.rb | 4 + .../otel_config_propagator_test.rb | 2 +- test/support/txn_name_manager_test.rb | 169 +++++++++++++++++- 10 files changed, 365 insertions(+), 95 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index e6e525d4..54651a73 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -57,3 +57,5 @@ Style/StringLiterals: - 'Gemfile' Naming/PredicateMethod: Enabled: false +Style/OneClassPerFile: + Enabled: false diff --git a/lib/solarwinds_apm/opentelemetry/otlp_processor.rb b/lib/solarwinds_apm/opentelemetry/otlp_processor.rb index dded1de0..60a3ca7b 100644 --- a/lib/solarwinds_apm/opentelemetry/otlp_processor.rb +++ b/lib/solarwinds_apm/opentelemetry/otlp_processor.rb @@ -30,7 +30,6 @@ def initialize(txn_manager) @txn_manager = txn_manager @metrics = init_response_time_metrics @is_lambda = SolarWindsAPM::Utils.determine_lambda - @transaction_name = nil end # @param [Span] span the (mutable) {Span} that just started. @@ -52,8 +51,8 @@ def on_finishing(span) SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] processor on_finishing span attributes: #{span.attributes}" } return if non_entry_span(span: span) - @transaction_name = calculate_transaction_names(span) - span.set_attribute(SW_TRANSACTION_NAME, @transaction_name) + transaction_name = calculate_transaction_names(span) + span.set_attribute(SW_TRANSACTION_NAME, transaction_name) @txn_manager.delete_root_context_h(span.context.hex_trace_id) SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] processor on_finishing end" } end @@ -108,7 +107,7 @@ def init_response_time_metrics def meter_attributes(span) meter_attrs = { SW_IS_ERROR => error?(span) == 1, - SW_TRANSACTION_NAME => @transaction_name + SW_TRANSACTION_NAME => span.attributes[SW_TRANSACTION_NAME] } is_http_span = span_http?(span) diff --git a/lib/solarwinds_apm/otel_config.rb b/lib/solarwinds_apm/otel_config.rb index 388897a1..ba80acbe 100644 --- a/lib/solarwinds_apm/otel_config.rb +++ b/lib/solarwinds_apm/otel_config.rb @@ -17,10 +17,12 @@ module OTelConfig @@config = {} @@config_map = {} @@agent_enabled = false + @@mutex = ::Mutex.new RESOURCE_ATTRIBUTES = 'RESOURCE_ATTRIBUTES' def self.initialize + @@mutex.synchronize { return if @@agent_enabled } return unless defined?(::OpenTelemetry::SDK::Configurator) is_lambda = SolarWindsAPM::Utils.determine_lambda @@ -87,7 +89,7 @@ def self.initialize txn_manager = TxnNameManager.new otlp_processor = SolarWindsAPM::OpenTelemetry::OTLPProcessor.new(txn_manager) - @@config[:metrics_processor] = otlp_processor + @@mutex.synchronize { @@config[:metrics_processor] = otlp_processor } ::OpenTelemetry.tracer_provider.add_span_processor(otlp_processor) # collector, service and headers are used for http sampler get settings @@ -113,17 +115,17 @@ def self.initialize remote_parent_not_sampled: sampler ) - @@agent_enabled = true + @@mutex.synchronize { @@agent_enabled = true } nil end def self.[](key) - @@config[key.to_sym] + @@mutex.synchronize { @@config[key.to_sym] } end def self.agent_enabled - @@agent_enabled + @@mutex.synchronize { @@agent_enabled } end def self.resolve_response_propagator diff --git a/lib/solarwinds_apm/sampling/oboe_sampler.rb b/lib/solarwinds_apm/sampling/oboe_sampler.rb index b4b30311..271be1b2 100644 --- a/lib/solarwinds_apm/sampling/oboe_sampler.rb +++ b/lib/solarwinds_apm/sampling/oboe_sampler.rb @@ -51,15 +51,7 @@ def should_sample?(params) @logger.debug { "[#{self.class}/#{__method__}] should_sample? params: #{params.inspect}; span type is #{type}" } # For local spans, we always trust the parent - if type == SolarWindsAPM::SpanType::LOCAL - if parent_span.context.trace_flags.sampled? - return OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::RECORD_AND_SAMPLE, - tracestate: DEFAULT_TRACESTATE) - else - return OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::DROP, - tracestate: DEFAULT_TRACESTATE) - end - end + return handle_local_span(parent_span) if type == SolarWindsAPM::SpanType::LOCAL sample_state = SampleState.new(OTEL_SAMPLING_DECISION::DROP, attributes || {}, @@ -73,58 +65,90 @@ def should_sample?(params) @counters[:request_count].add(1) - # adding trigger trace attributes to sample_state attribute as part of decision - if sample_state.headers['X-Trace-Options'] + # Parse and validate trace options; may return early on invalid signature + early_result = apply_trace_options(sample_state, parent_span) + return early_result if early_result - # TraceOptions.parse_trace_options return TriggerTraceOptions - sample_state.trace_options = ::SolarWindsAPM::TraceOptions.parse_trace_options(sample_state.headers['X-Trace-Options'], @logger) + # Check settings availability + early_result = check_settings_available(sample_state, parent_span) + return early_result if early_result - @logger.debug { "[#{self.class}/#{__method__}] sample_state.trace_options: #{sample_state.trace_options.inspect}" } + resolve_sampling_algorithm(sample_state) - if sample_state.headers['X-Trace-Options-Signature'] + xtracestate = generate_new_tracestate(parent_span, sample_state) + @logger.debug { "[#{self.class}/#{__method__}] final sampling state: #{sample_state.inspect}" } + + OTEL_SAMPLING_RESULT.new(decision: sample_state.decision, + tracestate: xtracestate, + attributes: sample_state.attributes) + end - # this validate_signature is the function from trace_options file - sample_state.trace_options.response.auth = TraceOptions.validate_signature( - sample_state.headers['X-Trace-Options'], - sample_state.headers['X-Trace-Options-Signature'], - sample_state.settings[:signature_key], - sample_state.trace_options.timestamp - ) + private - # If the request has an invalid signature, drop the trace - if sample_state.trace_options.response.auth != Auth::OK - @logger.debug { "[#{self.class}/#{__method__}] signature invalid; tracing disabled (auth=#{sample_state.trace_options.response.auth})" } + def handle_local_span(parent_span) + if parent_span.context.trace_flags.sampled? + OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::RECORD_AND_SAMPLE, + tracestate: DEFAULT_TRACESTATE) + else + OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::DROP, + tracestate: DEFAULT_TRACESTATE) + end + end - xtracestate = generate_new_tracestate(parent_span, sample_state) - return OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::DROP, - tracestate: xtracestate, - attributes: sample_state.attributes) - end + # Parses X-Trace-Options, validates signature, and applies trace option + # attributes. Returns an early OTEL_SAMPLING_RESULT on invalid signature, + # or nil to continue normal processing. + def apply_trace_options(sample_state, parent_span) + return unless sample_state.headers['X-Trace-Options'] + + sample_state.trace_options = ::SolarWindsAPM::TraceOptions.parse_trace_options(sample_state.headers['X-Trace-Options'], @logger) + @logger.debug { "[#{self.class}/#{__method__}] sample_state.trace_options: #{sample_state.trace_options.inspect}" } + + if sample_state.headers['X-Trace-Options-Signature'] + sample_state.trace_options.response.auth = TraceOptions.validate_signature( + sample_state.headers['X-Trace-Options'], + sample_state.headers['X-Trace-Options-Signature'], + sample_state.settings[:signature_key], + sample_state.trace_options.timestamp + ) + + if sample_state.trace_options.response.auth != Auth::OK + @logger.debug { "[#{self.class}/#{__method__}] signature invalid; tracing disabled (auth=#{sample_state.trace_options.response.auth})" } + xtracestate = generate_new_tracestate(parent_span, sample_state) + return OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::DROP, + tracestate: xtracestate, + attributes: sample_state.attributes) end - - # Apply trace options to span attributes and list ignored keys in response - sample_state.trace_options.response.trigger_trace = TriggerTrace::NOT_REQUESTED unless sample_state.trace_options.trigger_trace - sample_state.attributes[SW_KEYS_ATTRIBUTE] = sample_state.trace_options[:sw_keys] if sample_state.trace_options[:sw_keys] - sample_state.trace_options.custom.each { |k, v| sample_state.attributes[k] = v } - sample_state.trace_options.response.ignored = sample_state.trace_options[:ignored].map { |k, _| k } if sample_state.trace_options[:ignored].any? end - unless sample_state.settings - @logger.debug { "[#{self.class}/#{__method__}] settings unavailable; sampling disabled" } + # Apply trace options to span attributes and list ignored keys in response + sample_state.trace_options.response.trigger_trace = TriggerTrace::NOT_REQUESTED unless sample_state.trace_options.trigger_trace + sample_state.attributes[SW_KEYS_ATTRIBUTE] = sample_state.trace_options[:sw_keys] if sample_state.trace_options[:sw_keys] + sample_state.trace_options.custom.each { |k, v| sample_state.attributes[k] = v } + sample_state.trace_options.response.ignored = sample_state.trace_options[:ignored].map { |k, _| k } if sample_state.trace_options[:ignored].any? - sample_state.trace_options.response.trigger_trace = TriggerTrace::SETTINGS_NOT_AVAILABLE if sample_state.trace_options&.trigger_trace + nil + end - xtracestate = generate_new_tracestate(parent_span, sample_state) + # Returns an early OTEL_SAMPLING_RESULT when settings are unavailable, + # or nil to continue normal processing. + def check_settings_available(sample_state, parent_span) + return if sample_state.settings - return OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::DROP, - tracestate: xtracestate, - attributes: sample_state.attributes) - end + @logger.debug { "[#{self.class}/#{__method__}] settings unavailable; sampling disabled" } + sample_state.trace_options.response.trigger_trace = TriggerTrace::SETTINGS_NOT_AVAILABLE if sample_state.trace_options&.trigger_trace + xtracestate = generate_new_tracestate(parent_span, sample_state) + + OTEL_SAMPLING_RESULT.new(decision: OTEL_SAMPLING_DECISION::DROP, + tracestate: xtracestate, + attributes: sample_state.attributes) + end + # Decide which sampling algo to use based on tracestate and flags + # https://swicloud.atlassian.net/wiki/spaces/NIT/pages/3815473156/Tracing+Decision+Tree + def resolve_sampling_algorithm(sample_state) @logger.debug { "[#{self.class}/#{__method__}] sample_state before deciding sampling algo: #{sample_state.inspect}" } - # Decide which sampling algo to use and add sampling attribute to decision attributes - # https://swicloud.atlassian.net/wiki/spaces/NIT/pages/3815473156/Tracing+Decision+Tree if sample_state.trace_state && TRACESTATE_REGEXP.match?(sample_state.trace_state) parent_based_algo(sample_state) elsif sample_state.settings[:flags].anybits?(Flags::SAMPLE_START) @@ -136,15 +160,10 @@ def should_sample?(params) else disabled_algo(sample_state) end - - xtracestate = generate_new_tracestate(parent_span, sample_state) - @logger.debug { "[#{self.class}/#{__method__}] final sampling state: #{sample_state.inspect}" } - - OTEL_SAMPLING_RESULT.new(decision: sample_state.decision, - tracestate: xtracestate, - attributes: sample_state.attributes) end + public + def parent_based_algo(sample_state) @logger.debug { "[#{self.class}/#{__method__}] parent_based_algo start" } diff --git a/lib/solarwinds_apm/support/txn_name_manager.rb b/lib/solarwinds_apm/support/txn_name_manager.rb index d1158348..6334f9ea 100644 --- a/lib/solarwinds_apm/support/txn_name_manager.rb +++ b/lib/solarwinds_apm/support/txn_name_manager.rb @@ -8,7 +8,9 @@ module SolarWindsAPM # Transaction Name Manager is sole for enabling solarwinds api to call set_transaction_name - # 200 unique transaction naming per 60 seconds for the transaction naming + # 200 unique transaction naming per 60 seconds for the transaction naming. + # Uses a single cleanup thread with a min-heap priority queue instead of + # spawning one thread per unique transaction name. class TxnNameManager MAX_CARDINALITY = 200 DEFAULT_TXN_NAME = 'other' @@ -20,15 +22,17 @@ class TxnNameManager # @cache store the transaction name that will be used for exporter # to set transaction name for attributes, get by trace_span_id # set(trace_span_id, transaction_name) - # @transaction_name store the transaction name and its expire call - # set(transaction_name, thread {@cache.del()}) + # @transaction_name tracks known unique transaction names within the TTL window def initialize @cache = {} # for mapping trace_span_id to transaction name - @transaction_name = {} # only make sure the cardinality requirement that 200 unique txn within 60s - # use for setting DEFAULT_TXN_NAME (in @cache) when cardinality reach the maximal - @root_context_h = {} # for set_transaction_name api call + @transaction_name = {} # maps txn_name -> expiry time (epoch seconds) + @expiry_heap = [] # min-heap of [expiry_time, txn_name] for efficient cleanup + @root_context_h = {} # for set_transaction_name api call @mutex = Mutex.new @root_context_mutex = Mutex.new + @cleanup_cond = ConditionVariable.new + @cleanup_thread = nil + @stopped = false end def get(key) @@ -40,22 +44,18 @@ def del(key) end def set(key, value) - # new name and room in pool -> add name to pool and schedule for removal after ttl -> return name - # new name but no room in pool -> return default name - # existing name -> cancel previously scheduled removal -> schedule new removal -> return name txn_name = value.slice(0, SolarWindsAPM::Constants::MAX_TXN_NAME_LENGTH) - txn_name_exist = false + expiry = Time.now.to_i + TXN_NAME_POOL_TTL - # thread exit execute ensure block, better option than kill for sleep @mutex.synchronize do - if @transaction_name[txn_name] - txn_name_exist = true - @transaction_name[txn_name].exit - end + txn_name_exist = @transaction_name.key?(txn_name) if txn_name_exist || @transaction_name.size < MAX_CARDINALITY @cache[key] = txn_name - @transaction_name[txn_name] = Thread.new { cleanup_txn(key, txn_name) } + @transaction_name[txn_name] = expiry + heap_push([expiry, txn_name]) + ensure_cleanup_thread + @cleanup_cond.signal else @cache[key] = DEFAULT_TXN_NAME end @@ -85,12 +85,97 @@ def delete_root_context_h(key) end end - def cleanup_txn(_key, txn_name) - sleep(TXN_NAME_POOL_TTL) - ensure + # Stops the cleanup thread. Call on shutdown to avoid thread leakage. + def stop @mutex.synchronize do - @transaction_name.delete(txn_name) + @stopped = true + @cleanup_cond.signal + end + @cleanup_thread&.join + end + + private + + def ensure_cleanup_thread + return if @cleanup_thread&.alive? + + @cleanup_thread = Thread.new { cleanup_loop } + @cleanup_thread.abort_on_exception = false + end + + # Single long-lived thread that sleeps until the next expiry, then + # removes all expired transaction names from the pool. + def cleanup_loop + @mutex.synchronize do + until @stopped + purge_expired_entries + + if @expiry_heap.empty? + @cleanup_cond.wait(@mutex) + else + wait_seconds = [@expiry_heap.first[0] - Time.now.to_i, 0].max + @cleanup_cond.wait(@mutex, wait_seconds) if wait_seconds.positive? + end + end + end + rescue StandardError => e + SolarWindsAPM.logger.warn { "[#{self.class}/#{__method__}] cleanup thread error: #{e.message}" } + end + + def purge_expired_entries + now = Time.now.to_i + while (entry = @expiry_heap.first) + break if entry[0] > now + + heap_pop + txn_name = entry[1] + # Only delete if the recorded expiry matches (not renewed) + @transaction_name.delete(txn_name) if @transaction_name[txn_name] == entry[0] end end + + # Min-heap operations (heap ordered by expiry time) + def heap_push(entry) + @expiry_heap << entry + sift_up(@expiry_heap.size - 1) + end + + def heap_pop + return if @expiry_heap.empty? + + swap(0, @expiry_heap.size - 1) + result = @expiry_heap.pop + sift_down(0) + result + end + + def sift_up(idx) + while idx.positive? + parent = (idx - 1) / 2 + break if @expiry_heap[parent][0] <= @expiry_heap[idx][0] + + swap(idx, parent) + idx = parent + end + end + + def sift_down(idx) + size = @expiry_heap.size + loop do + smallest = idx + left = (2 * idx) + 1 + right = (2 * idx) + 2 + smallest = left if left < size && @expiry_heap[left][0] < @expiry_heap[smallest][0] + smallest = right if right < size && @expiry_heap[right][0] < @expiry_heap[smallest][0] + break if smallest == idx + + swap(idx, smallest) + idx = smallest + end + end + + def swap(idx_i, idx_j) + @expiry_heap[idx_i], @expiry_heap[idx_j] = @expiry_heap[idx_j], @expiry_heap[idx_i] + end end end diff --git a/test/api/set_transaction_name_test.rb b/test/api/set_transaction_name_test.rb index f45e7648..cd6dfcc2 100644 --- a/test/api/set_transaction_name_test.rb +++ b/test/api/set_transaction_name_test.rb @@ -14,6 +14,7 @@ @span = create_span @dummy_span = create_span @dummy_span.context.instance_variable_set(:@span_id, 'fake_span_id') # with fake span_id, should still find the right root span + SolarWindsAPM::OTelConfig.class_variable_set(:@@agent_enabled, false) SolarWindsAPM::OTelConfig.initialize @solarwinds_processor = SolarWindsAPM::OTelConfig[:metrics_processor] end diff --git a/test/opentelemetry/otlp_processor_test.rb b/test/opentelemetry/otlp_processor_test.rb index 078d4212..fd25c21a 100644 --- a/test/opentelemetry/otlp_processor_test.rb +++ b/test/opentelemetry/otlp_processor_test.rb @@ -215,7 +215,7 @@ it 'test_meter_attributes_with_old_http_method' do span_limits = OpenTelemetry::SDK::Trace::SpanLimits.new - attributes = { 'http.method' => 'GET', 'http.status_code' => 200 } + attributes = { 'http.method' => 'GET', 'http.status_code' => 200, 'sw.transaction' => 'test_transaction' } span_context = OpenTelemetry::Trace::SpanContext.new( span_id: "1\xE1u\x12\x8E\xFC@\x18", trace_id: "w\xCBl\xCCR-1\x06\x11M\xD6\xEC\xBBp\x03j" @@ -236,7 +236,6 @@ nil ) span_data = span.to_span_data - @processor.instance_variable_set(:@transaction_name, 'test_transaction') result = @processor.send(:meter_attributes, span_data) _(result['http.method']).must_equal 'GET' @@ -247,7 +246,7 @@ it 'test_meter_attributes_with_new_http_request_method' do span_limits = OpenTelemetry::SDK::Trace::SpanLimits.new - attributes = { 'http.request.method' => 'POST', 'http.response.status_code' => 201 } + attributes = { 'http.request.method' => 'POST', 'http.response.status_code' => 201, 'sw.transaction' => 'test_transaction' } span_context = OpenTelemetry::Trace::SpanContext.new( span_id: "1\xE1u\x12\x8E\xFC@\x18", trace_id: "w\xCBl\xCCR-1\x06\x11M\xD6\xEC\xBBp\x03j" @@ -268,7 +267,6 @@ nil ) span_data = span.to_span_data - @processor.instance_variable_set(:@transaction_name, 'test_transaction') result = @processor.send(:meter_attributes, span_data) _(result['http.method']).must_equal 'POST' @@ -279,7 +277,7 @@ it 'test_meter_attributes_with_new_and_old_status_code_as_same_code' do span_limits = OpenTelemetry::SDK::Trace::SpanLimits.new - attributes = { 'http.request.method' => 'POST', 'http.status_code' => 201, 'http.response.status_code' => 201 } + attributes = { 'http.request.method' => 'POST', 'http.status_code' => 201, 'http.response.status_code' => 201, 'sw.transaction' => 'test_transaction' } span_context = OpenTelemetry::Trace::SpanContext.new( span_id: "1\xE1u\x12\x8E\xFC@\x18", trace_id: "w\xCBl\xCCR-1\x06\x11M\xD6\xEC\xBBp\x03j" @@ -300,7 +298,6 @@ nil ) span_data = span.to_span_data - @processor.instance_variable_set(:@transaction_name, 'test_transaction') result = @processor.send(:meter_attributes, span_data) _(result['http.method']).must_equal 'POST' @@ -311,7 +308,7 @@ it 'test_meter_attributes_with_new_and_old_status_code_as_different_code' do span_limits = OpenTelemetry::SDK::Trace::SpanLimits.new - attributes = { 'http.request.method' => 'POST', 'http.status_code' => 200, 'http.response.status_code' => 201 } + attributes = { 'http.request.method' => 'POST', 'http.status_code' => 200, 'http.response.status_code' => 201, 'sw.transaction' => 'test_transaction' } span_context = OpenTelemetry::Trace::SpanContext.new( span_id: "1\xE1u\x12\x8E\xFC@\x18", trace_id: "w\xCBl\xCCR-1\x06\x11M\xD6\xEC\xBBp\x03j" @@ -332,7 +329,6 @@ nil ) span_data = span.to_span_data - @processor.instance_variable_set(:@transaction_name, 'test_transaction') result = @processor.send(:meter_attributes, span_data) _(result['http.method']).must_equal 'POST' @@ -343,7 +339,7 @@ it 'test_meter_attributes_non_http_span' do span_limits = OpenTelemetry::SDK::Trace::SpanLimits.new - attributes = {} + attributes = { 'sw.transaction' => 'test_transaction' } span_context = OpenTelemetry::Trace::SpanContext.new( span_id: "1\xE1u\x12\x8E\xFC@\x18", trace_id: "w\xCBl\xCCR-1\x06\x11M\xD6\xEC\xBBp\x03j" @@ -364,7 +360,6 @@ nil ) span_data = span.to_span_data - @processor.instance_variable_set(:@transaction_name, 'test_transaction') result = @processor.send(:span_http?, span_data) _(result).must_equal false diff --git a/test/solarwinds_apm/otel_config_log_bridge_test.rb b/test/solarwinds_apm/otel_config_log_bridge_test.rb index a1abaf26..cab720ba 100644 --- a/test/solarwinds_apm/otel_config_log_bridge_test.rb +++ b/test/solarwinds_apm/otel_config_log_bridge_test.rb @@ -11,6 +11,10 @@ describe 'Log Bridge Initialization Test' do describe 'check if log bridge is enabled' do + before do + SolarWindsAPM::OTelConfig.class_variable_set(:@@agent_enabled, false) + end + after do ENV.delete('OTEL_RUBY_INSTRUMENTATION_LOGGER_ENABLED') end diff --git a/test/solarwinds_apm/otel_config_propagator_test.rb b/test/solarwinds_apm/otel_config_propagator_test.rb index b035ec06..7a412355 100644 --- a/test/solarwinds_apm/otel_config_propagator_test.rb +++ b/test/solarwinds_apm/otel_config_propagator_test.rb @@ -12,7 +12,7 @@ describe 'Loading Opentelemetry Test' do before do clean_old_setting - SolarWindsAPM::OTelConfig.class_variable_set(:@@agent_enabled, true) + SolarWindsAPM::OTelConfig.class_variable_set(:@@agent_enabled, false) SolarWindsAPM::OTelConfig.class_variable_set(:@@config, {}) SolarWindsAPM::OTelConfig.class_variable_set(:@@config_map, {}) end diff --git a/test/support/txn_name_manager_test.rb b/test/support/txn_name_manager_test.rb index badde4c9..d74852cb 100644 --- a/test/support/txn_name_manager_test.rb +++ b/test/support/txn_name_manager_test.rb @@ -9,7 +9,10 @@ describe 'SolarWindsTXNNameManangerTest.rb' do before do @txn_manager = SolarWindsAPM::TxnNameManager.new - @txn_manager.set('c', 'd') + end + + after do + @txn_manager.stop end it 'test_set' do @@ -23,11 +26,13 @@ end it 'test_del' do + @txn_manager.set('c', 'd') @txn_manager.del('c') assert_nil(@txn_manager.get('c')) end it 'test_get' do + @txn_manager.set('c', 'd') _(@txn_manager.get('c')).must_equal 'd' end @@ -36,7 +41,165 @@ _(@txn_manager.get_root_context_h('key1')).must_equal 'abcd' end - it 'transaction_name have thread' do - _(@txn_manager.instance_variable_get(:@transaction_name)['d'].class).must_equal Thread + it 'transaction_name tracks expiry' do + @txn_manager.set('c', 'd') + expiry = @txn_manager.instance_variable_get(:@transaction_name)['d'] + _(expiry.class).must_equal Integer + assert expiry > Time.now.to_i + end + + describe 'cardinality management' do + it 'caps transaction name at DEFAULT_TXN_NAME when cardinality limit reached' do + SolarWindsAPM::TxnNameManager::MAX_CARDINALITY.times do |i| + @txn_manager.set("key_#{i}", "unique_name_#{i}") + end + + @txn_manager.set('overflow_key', 'overflow_name') + _(@txn_manager.get('overflow_key')).must_equal SolarWindsAPM::TxnNameManager::DEFAULT_TXN_NAME + end + + it 'allows renewal of existing name when pool is at max cardinality' do + SolarWindsAPM::TxnNameManager::MAX_CARDINALITY.times do |i| + @txn_manager.set("key_#{i}", "unique_name_#{i}") + end + + # 'unique_name_0' is already in pool — setting another key to it should succeed + @txn_manager.set('new_key', 'unique_name_0') + _(@txn_manager.get('new_key')).must_equal 'unique_name_0' + end + + it 'renewal updates the expiry for an existing transaction name' do + @txn_manager.set('c', 'd') + old_expiry = @txn_manager.instance_variable_get(:@transaction_name)['d'] + sleep(1) + @txn_manager.set('c', 'd') + new_expiry = @txn_manager.instance_variable_get(:@transaction_name)['d'] + assert new_expiry > old_expiry + end + end + + describe 'heap structure' do + it 'expiry heap size grows with each unique transaction name' do + initial_size = @txn_manager.instance_variable_get(:@expiry_heap).size + @txn_manager.set('k1', 'alpha') + @txn_manager.set('k2', 'beta') + + heap = @txn_manager.instance_variable_get(:@expiry_heap) + _(heap.size).must_equal initial_size + 2 + end + + it 'expiry heap root holds the earliest expiry' do + @txn_manager.set('k1', 'alpha') + @txn_manager.set('k2', 'beta') + @txn_manager.set('k3', 'gamma') + + heap = @txn_manager.instance_variable_get(:@expiry_heap) + heap.each_with_index do |entry, i| + next if i.zero? + + assert heap[0][0] <= entry[0], + "heap root expiry #{heap[0][0]} should be <= heap[#{i}] expiry #{entry[0]}" + end + end + + it 'heap grows by one entry when same name is set again (renewal appends)' do + @txn_manager.set('c', 'd') # seed an entry + size_before = @txn_manager.instance_variable_get(:@expiry_heap).size + @txn_manager.set('c', 'd') # renew existing name + size_after = @txn_manager.instance_variable_get(:@expiry_heap).size + _(size_after).must_equal size_before + 1 + end + end + + describe 'purge_expired_entries' do + it 'purge_expired_entries removes expired transaction names from pool' do + past_expiry = Time.now.to_i - 10 + + # Simulate an already-expired entry by overwriting state directly + @txn_manager.instance_variable_get(:@transaction_name)['d'] = past_expiry + @txn_manager.instance_variable_get(:@expiry_heap).clear + @txn_manager.send(:heap_push, [past_expiry, 'd']) + + @txn_manager.send(:purge_expired_entries) + + assert_nil @txn_manager.instance_variable_get(:@transaction_name)['d'] + _(@txn_manager.instance_variable_get(:@expiry_heap)).must_be_empty + end + + it 'purge_expired_entries leaves names whose expiry is still in the future' do + future_expiry = Time.now.to_i + 60 + + @txn_manager.instance_variable_get(:@transaction_name)['d'] = future_expiry + @txn_manager.instance_variable_get(:@expiry_heap).clear + @txn_manager.send(:heap_push, [future_expiry, 'd']) + + @txn_manager.send(:purge_expired_entries) + + _(@txn_manager.instance_variable_get(:@transaction_name)['d']).must_equal future_expiry + end + + it 'renewal guard: stale heap entry does not evict a renewed transaction name' do + stale_expiry = Time.now.to_i - 10 + fresh_expiry = Time.now.to_i + 60 + + # Simulate renewal: @transaction_name holds new expiry, heap still has old entry + @txn_manager.instance_variable_get(:@transaction_name)['d'] = fresh_expiry + @txn_manager.instance_variable_get(:@expiry_heap).clear + @txn_manager.send(:heap_push, [stale_expiry, 'd']) + + @txn_manager.send(:purge_expired_entries) + + # 'd' must survive because stored expiry (fresh) != stale heap entry's expiry + _(@txn_manager.instance_variable_get(:@transaction_name)['d']).must_equal fresh_expiry + end + + it 'purge_expired_entries stops at first non-expired entry (min-heap ordering)' do + now = Time.now.to_i + expired = now - 5 + future1 = now + 30 + future2 = now + 60 + + txn_names = @txn_manager.instance_variable_get(:@transaction_name) + txn_names['expired_a'] = expired + txn_names['live_b'] = future1 + txn_names['live_c'] = future2 + + @txn_manager.instance_variable_get(:@expiry_heap).clear + @txn_manager.send(:heap_push, [expired, 'expired_a']) + @txn_manager.send(:heap_push, [future1, 'live_b']) + @txn_manager.send(:heap_push, [future2, 'live_c']) + + @txn_manager.send(:purge_expired_entries) + + assert_nil txn_names['expired_a'] + _(@txn_manager.instance_variable_get(:@transaction_name)['live_b']).must_equal future1 + _(@txn_manager.instance_variable_get(:@transaction_name)['live_c']).must_equal future2 + end + end + + describe 'cleanup thread lifecycle' do + it 'cleanup thread is started on first set and is alive' do + @txn_manager.set('a', 'b') + thread = @txn_manager.instance_variable_get(:@cleanup_thread) + assert thread.alive? + end + + it 'stop terminates the cleanup thread' do + @txn_manager.set('a', 'b') + thread = @txn_manager.instance_variable_get(:@cleanup_thread) + assert thread.alive? + + @txn_manager.stop + + refute thread.alive? + end + + it 'stop is idempotent and does not raise when called twice' do + @txn_manager.stop + assert @txn_manager.instance_variable_get(:@stopped) + + # A second stop should not raise even though the thread is already dead + @txn_manager.stop + end end end