diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index 9bf1bb0..0ba5ba3 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -32,6 +32,8 @@ def initialize(token, config, tracker_callback, error_handler) @flag_definitions = {} @polling_thread = nil @stop_polling = false + @polling_mutex = Mutex.new + @polling_condition = ConditionVariable.new end # Start polling for flag definitions @@ -43,8 +45,17 @@ def start_polling_for_definitions! @stop_polling = false @polling_thread = Thread.new do loop do - sleep @config[:polling_interval_in_seconds] - break if @stop_polling + # Check @stop_polling INSIDE the mutex (before and after wait) so a + # broadcast from stop_polling_for_definitions! can't be lost if it + # arrives while we're outside the synchronized region (e.g. during + # fetch_flag_definitions below). + stopped = @polling_mutex.synchronize do + next true if @stop_polling + + @polling_condition.wait(@polling_mutex, @config[:polling_interval_in_seconds]) + @stop_polling + end + break if stopped begin fetch_flag_definitions @@ -59,7 +70,10 @@ def start_polling_for_definitions! end def stop_polling_for_definitions! - @stop_polling = true + @polling_mutex.synchronize do + @stop_polling = true + @polling_condition.broadcast + end @polling_thread&.join @polling_thread = nil end diff --git a/spec/mixpanel-ruby/flags/local_flags_spec.rb b/spec/mixpanel-ruby/flags/local_flags_spec.rb index 6969521..62bb452 100644 --- a/spec/mixpanel-ruby/flags/local_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/local_flags_spec.rb @@ -755,5 +755,80 @@ def user_context_with_properties(properties) polling_provider.stop_polling_for_definitions! end end + + it 'shuts down promptly while the polling thread is waiting on the interval' do + flag = create_test_flag + stub_flag_definitions([flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + # A long interval ensures the polling thread is parked on the wait when + # shutdown is requested — pre-fix, the join would have to ride out the + # full interval before the thread checked @stop_polling. + { enable_polling: true, polling_interval_in_seconds: 30 }, + mock_tracker, + mock_error_handler + ) + + polling_provider.start_polling_for_definitions! + sleep 0.1 # let the polling thread enter the condition wait + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + polling_provider.stop_polling_for_definitions! + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + expect(elapsed).to be < 1.0 + end + + it 'shuts down promptly when stop races an in-flight fetch' do + flag = create_test_flag + fetch_count = 0 + fetch_count_mutex = Mutex.new + second_fetch_started = Queue.new + second_fetch_release = Queue.new + + stub_request(:get, endpoint_url_regex) + .to_return do |_request| + this_call = fetch_count_mutex.synchronize { fetch_count += 1 } + if this_call == 2 + second_fetch_started << true + second_fetch_release.pop + end + { + status: 200, + body: { code: 200, flags: [flag] }.to_json, + headers: { 'Content-Type' => 'application/json' } + } + end + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + # 1 s interval — large enough that the lost-wakeup race (if reintroduced) + # would be detectable as ~1 s of extra teardown after the fetch finishes, + # but small enough that the second polling-thread fetch starts quickly. + { enable_polling: true, polling_interval_in_seconds: 1.0 }, + mock_tracker, + mock_error_handler + ) + + polling_provider.start_polling_for_definitions! + second_fetch_started.pop # 2nd fetch is now blocked in the polling thread + + # Trigger shutdown while the polling thread is mid-fetch (NOT on the CV), + # so the broadcast goes to no waiter. Run it in a thread so we can release + # the fetch afterwards. + stopper = Thread.new { polling_provider.stop_polling_for_definitions! } + sleep 0.05 # give stopper time to set @stop_polling + broadcast + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + second_fetch_release << true + stopper.join + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + + # With the predicate-inside-mutex check, the polling thread re-enters the + # mutex after fetch, sees @stop_polling = true, skips the wait, and breaks + # immediately. Without it, the thread would call wait(1.0 s), ride out the + # full interval, and only then break — elapsed would be ~1 s. + expect(elapsed).to be < 0.5 + end end end