diff --git a/lib/action_cable/subscription_adapter/solid_cable.rb b/lib/action_cable/subscription_adapter/solid_cable.rb index d4e873d..679ee91 100644 --- a/lib/action_cable/subscription_adapter/solid_cable.rb +++ b/lib/action_cable/subscription_adapter/solid_cable.rb @@ -12,6 +12,13 @@ class SolidCable < ::ActionCable::SubscriptionAdapter::Base def initialize(*) super + @mutex = + if defined?(@server) + @server.mutex + else + Mutex.new + end + @listener = nil end @@ -33,11 +40,20 @@ def unsubscribe(channel, callback) private def listener - @listener || @server.mutex.synchronize do - @listener ||= Listener.new(@server.event_loop) + @listener || @mutex.synchronize do + @listener ||= Listener.new(self, pubsub_executor) end end + def pubsub_executor + @pubsub_executor ||= + if respond_to?(:executor, true) + executor + else + @server.event_loop + end + end + class Listener < ::ActionCable::SubscriptionAdapter::SubscriberMap CONNECTION_ERRORS = [ ActiveRecord::ConnectionFailed, @@ -46,10 +62,12 @@ class Listener < ::ActionCable::SubscriptionAdapter::SubscriberMap ] Stop = Class.new(Exception) - def initialize(event_loop) - super() + delegate :logger, to: :@adapter - @event_loop = event_loop + def initialize(adapter, executor) + super() + @adapter = adapter + @executor = executor # Critical section begins with 0 permits. It can be understood as # being "normally held" by the listener thread. It is released @@ -105,7 +123,7 @@ def shutdown def add_channel(channel, on_success) channels[channel] = last_message_id - event_loop.post(&on_success) if on_success + on_success.call if on_success end def remove_channel(channel) @@ -113,11 +131,11 @@ def remove_channel(channel) end def invoke_callback(*) - event_loop.post { super } + executor.post { super } end private - attr_reader :event_loop, :thread + attr_reader :executor, :thread attr_accessor :last_id, :reconnect_attempt def last_message_id