Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions lib/mixpanel-ruby/flags/local_flags_provider.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def initialize(token, config, tracker_callback, error_handler)
@flag_definitions = {}
@polling_thread = nil
@stop_polling = false
@polling_mutex = Mutex.new
@polling_condition = ConditionVariable.new
end

# Start polling for flag definitions
Expand All @@ -43,8 +45,17 @@ def start_polling_for_definitions!
@stop_polling = false
@polling_thread = Thread.new do
loop do
sleep @config[:polling_interval_in_seconds]
break if @stop_polling
# Check @stop_polling INSIDE the mutex (before and after wait) so a
# broadcast from stop_polling_for_definitions! can't be lost if it
# arrives while we're outside the synchronized region (e.g. during
# fetch_flag_definitions below).
stopped = @polling_mutex.synchronize do
next true if @stop_polling

@polling_condition.wait(@polling_mutex, @config[:polling_interval_in_seconds])
@stop_polling
end
break if stopped

begin
fetch_flag_definitions
Expand All @@ -59,7 +70,10 @@ def start_polling_for_definitions!
end

def stop_polling_for_definitions!
@stop_polling = true
@polling_mutex.synchronize do
@stop_polling = true
@polling_condition.broadcast
end
@polling_thread&.join
@polling_thread = nil
end
Expand Down
75 changes: 75 additions & 0 deletions spec/mixpanel-ruby/flags/local_flags_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -755,5 +755,80 @@ def user_context_with_properties(properties)
polling_provider.stop_polling_for_definitions!
end
end

it 'shuts down promptly while the polling thread is waiting on the interval' do
flag = create_test_flag
stub_flag_definitions([flag])

polling_provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
# A long interval ensures the polling thread is parked on the wait when
# shutdown is requested — pre-fix, the join would have to ride out the
# full interval before the thread checked @stop_polling.
{ enable_polling: true, polling_interval_in_seconds: 30 },
mock_tracker,
mock_error_handler
)

polling_provider.start_polling_for_definitions!
sleep 0.1 # let the polling thread enter the condition wait

t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
polling_provider.stop_polling_for_definitions!
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
expect(elapsed).to be < 1.0
end

it 'shuts down promptly when stop races an in-flight fetch' do
flag = create_test_flag
fetch_count = 0
fetch_count_mutex = Mutex.new
second_fetch_started = Queue.new
second_fetch_release = Queue.new

stub_request(:get, endpoint_url_regex)
.to_return do |_request|
this_call = fetch_count_mutex.synchronize { fetch_count += 1 }
if this_call == 2
second_fetch_started << true
second_fetch_release.pop
end
{
status: 200,
body: { code: 200, flags: [flag] }.to_json,
headers: { 'Content-Type' => 'application/json' }
}
end

polling_provider = Mixpanel::Flags::LocalFlagsProvider.new(
test_token,
# 1 s interval — large enough that the lost-wakeup race (if reintroduced)
# would be detectable as ~1 s of extra teardown after the fetch finishes,
# but small enough that the second polling-thread fetch starts quickly.
{ enable_polling: true, polling_interval_in_seconds: 1.0 },
mock_tracker,
mock_error_handler
)

polling_provider.start_polling_for_definitions!
second_fetch_started.pop # 2nd fetch is now blocked in the polling thread

# Trigger shutdown while the polling thread is mid-fetch (NOT on the CV),
# so the broadcast goes to no waiter. Run it in a thread so we can release
# the fetch afterwards.
stopper = Thread.new { polling_provider.stop_polling_for_definitions! }
sleep 0.05 # give stopper time to set @stop_polling + broadcast

t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
second_fetch_release << true
stopper.join
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0

# With the predicate-inside-mutex check, the polling thread re-enters the
# mutex after fetch, sees @stop_polling = true, skips the wait, and breaks
# immediately. Without it, the thread would call wait(1.0 s), ride out the
# full interval, and only then break — elapsed would be ~1 s.
expect(elapsed).to be < 0.5
end
end
end
Loading