From ce11f0214837e718ce3abb39947136b48aec8299 Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Mon, 15 Jun 2026 15:59:39 -0400 Subject: [PATCH 1/2] Make polling loop shutdown promptly via condition variable stop_polling_for_definitions! previously took up to polling_interval_in_seconds to return (3 min teardown observed during cross-SDK audit at the default 60s interval) because the polling thread blocked on an uninterruptible sleep before checking @stop_polling. Replace the sleep with Mutex#synchronize + ConditionVariable#wait, and have stop_polling_for_definitions! broadcast the condition so the thread wakes immediately. Shutdown is now ~0.1ms in a smoke test. Co-Authored-By: Claude Opus 4.7 --- lib/mixpanel-ruby/flags/local_flags_provider.rb | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index 9bf1bb0..db23fa1 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -32,6 +32,8 @@ def initialize(token, config, tracker_callback, error_handler) @flag_definitions = {} @polling_thread = nil @stop_polling = false + @polling_mutex = Mutex.new + @polling_condition = ConditionVariable.new end # Start polling for flag definitions @@ -43,7 +45,9 @@ def start_polling_for_definitions! @stop_polling = false @polling_thread = Thread.new do loop do - sleep @config[:polling_interval_in_seconds] + @polling_mutex.synchronize do + @polling_condition.wait(@polling_mutex, @config[:polling_interval_in_seconds]) + end break if @stop_polling begin @@ -59,7 +63,10 @@ def start_polling_for_definitions! end def stop_polling_for_definitions! - @stop_polling = true + @polling_mutex.synchronize do + @stop_polling = true + @polling_condition.broadcast + end @polling_thread&.join @polling_thread = nil end From deac4ef29775c29492204748d6558d8f4e1fcea9 Mon Sep 17 00:00:00 2001 From: Tyler Roach Date: Tue, 16 Jun 2026 08:55:49 -0400 Subject: [PATCH 2/2] Check @stop_polling inside mutex to close lost-wakeup race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit's polling loop called ConditionVariable#wait inside @polling_mutex but checked @stop_polling outside it: @polling_mutex.synchronize do @polling_condition.wait(@polling_mutex, interval) end break if @stop_polling If stop_polling_for_definitions! set @stop_polling = true and called broadcast while the polling thread was outside the mutex (typically mid fetch_flag_definitions), the broadcast had no waiter and was lost. The thread would then re-enter the synchronized region and call wait(interval) again, riding out the full polling interval before observing @stop_polling — reintroducing the original slow-shutdown bug for the duration of one extra tick. Move the predicate check inside the synchronized region so the polling thread sees @stop_polling consistently with the broadcast and skips the wait when shutdown has already been requested. Add two regression specs: - shutdown latency while the thread is parked on the CV (basic case) - shutdown latency when shutdown races an in-flight fetch (race case) Without the predicate-inside-mutex fix, the race spec fails with elapsed ~= polling_interval_in_seconds. With it, elapsed is sub-millisecond. Co-Authored-By: Claude Opus 4.7 --- .../flags/local_flags_provider.rb | 11 ++- spec/mixpanel-ruby/flags/local_flags_spec.rb | 75 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/lib/mixpanel-ruby/flags/local_flags_provider.rb b/lib/mixpanel-ruby/flags/local_flags_provider.rb index db23fa1..0ba5ba3 100644 --- a/lib/mixpanel-ruby/flags/local_flags_provider.rb +++ b/lib/mixpanel-ruby/flags/local_flags_provider.rb @@ -45,10 +45,17 @@ def start_polling_for_definitions! @stop_polling = false @polling_thread = Thread.new do loop do - @polling_mutex.synchronize do + # Check @stop_polling INSIDE the mutex (before and after wait) so a + # broadcast from stop_polling_for_definitions! can't be lost if it + # arrives while we're outside the synchronized region (e.g. during + # fetch_flag_definitions below). + stopped = @polling_mutex.synchronize do + next true if @stop_polling + @polling_condition.wait(@polling_mutex, @config[:polling_interval_in_seconds]) + @stop_polling end - break if @stop_polling + break if stopped begin fetch_flag_definitions diff --git a/spec/mixpanel-ruby/flags/local_flags_spec.rb b/spec/mixpanel-ruby/flags/local_flags_spec.rb index 6969521..62bb452 100644 --- a/spec/mixpanel-ruby/flags/local_flags_spec.rb +++ b/spec/mixpanel-ruby/flags/local_flags_spec.rb @@ -755,5 +755,80 @@ def user_context_with_properties(properties) polling_provider.stop_polling_for_definitions! end end + + it 'shuts down promptly while the polling thread is waiting on the interval' do + flag = create_test_flag + stub_flag_definitions([flag]) + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + # A long interval ensures the polling thread is parked on the wait when + # shutdown is requested — pre-fix, the join would have to ride out the + # full interval before the thread checked @stop_polling. + { enable_polling: true, polling_interval_in_seconds: 30 }, + mock_tracker, + mock_error_handler + ) + + polling_provider.start_polling_for_definitions! + sleep 0.1 # let the polling thread enter the condition wait + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + polling_provider.stop_polling_for_definitions! + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + expect(elapsed).to be < 1.0 + end + + it 'shuts down promptly when stop races an in-flight fetch' do + flag = create_test_flag + fetch_count = 0 + fetch_count_mutex = Mutex.new + second_fetch_started = Queue.new + second_fetch_release = Queue.new + + stub_request(:get, endpoint_url_regex) + .to_return do |_request| + this_call = fetch_count_mutex.synchronize { fetch_count += 1 } + if this_call == 2 + second_fetch_started << true + second_fetch_release.pop + end + { + status: 200, + body: { code: 200, flags: [flag] }.to_json, + headers: { 'Content-Type' => 'application/json' } + } + end + + polling_provider = Mixpanel::Flags::LocalFlagsProvider.new( + test_token, + # 1 s interval — large enough that the lost-wakeup race (if reintroduced) + # would be detectable as ~1 s of extra teardown after the fetch finishes, + # but small enough that the second polling-thread fetch starts quickly. + { enable_polling: true, polling_interval_in_seconds: 1.0 }, + mock_tracker, + mock_error_handler + ) + + polling_provider.start_polling_for_definitions! + second_fetch_started.pop # 2nd fetch is now blocked in the polling thread + + # Trigger shutdown while the polling thread is mid-fetch (NOT on the CV), + # so the broadcast goes to no waiter. Run it in a thread so we can release + # the fetch afterwards. + stopper = Thread.new { polling_provider.stop_polling_for_definitions! } + sleep 0.05 # give stopper time to set @stop_polling + broadcast + + t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) + second_fetch_release << true + stopper.join + elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0 + + # With the predicate-inside-mutex check, the polling thread re-enters the + # mutex after fetch, sees @stop_polling = true, skips the wait, and breaks + # immediately. Without it, the thread would call wait(1.0 s), ride out the + # full interval, and only then break — elapsed would be ~1 s. + expect(elapsed).to be < 0.5 + end end end