Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions lib/action_cable/subscription_adapter/solid_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ class SolidCable < ::ActionCable::SubscriptionAdapter::Base

def initialize(*)
super
@mutex =
if defined?(@server)
@server.mutex
else
Mutex.new
end

@listener = nil
end

Expand All @@ -33,11 +40,20 @@ def unsubscribe(channel, callback)

private
def listener
@listener || @server.mutex.synchronize do
@listener ||= Listener.new(@server.event_loop)
@listener || @mutex.synchronize do
@listener ||= Listener.new(self, pubsub_executor)
end
end

def pubsub_executor
@pubsub_executor ||=
if respond_to?(:executor, true)
executor
else
@server.event_loop
end
end

class Listener < ::ActionCable::SubscriptionAdapter::SubscriberMap
CONNECTION_ERRORS = [
ActiveRecord::ConnectionFailed,
Expand All @@ -46,10 +62,12 @@ class Listener < ::ActionCable::SubscriptionAdapter::SubscriberMap
]
Stop = Class.new(Exception)

def initialize(event_loop)
super()
delegate :logger, to: :@adapter

@event_loop = event_loop
def initialize(adapter, executor)
super()
@adapter = adapter
@executor = executor

# Critical section begins with 0 permits. It can be understood as
# being "normally held" by the listener thread. It is released
Expand Down Expand Up @@ -105,19 +123,19 @@ def shutdown

def add_channel(channel, on_success)
channels[channel] = last_message_id
event_loop.post(&on_success) if on_success
on_success.call if on_success
end

def remove_channel(channel)
channels.delete(channel)
end

def invoke_callback(*)
event_loop.post { super }
executor.post { super }
end

private
attr_reader :event_loop, :thread
attr_reader :executor, :thread
attr_accessor :last_id, :reconnect_attempt

def last_message_id
Expand Down