Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion lib/warren/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,26 @@ def control_loop
stopped!
else
# Prompt any sleeping workers to check if they need to recover
foxes.each(&:attempt_recovery)
foxes.each do |fox|
pause_running_fox_without_consumer(fox)
fox.attempt_recovery
end
sleep(SECONDS_TO_SLEEP)
end
end

# Forces a running fox into the normal pause/recovery flow when it has no
# active consumer object.
#
# This handles cases where a fox is marked running but has no active consumer.
#
# @param fox [Warren::Fox] The fox to inspect
# @return [void]
def pause_running_fox_without_consumer(fox)
return unless fox.running? && !fox.consumer_present?

fox.warn { 'Consumer missing while running; pausing to trigger recovery' }
fox.pause!
end
end
end
21 changes: 20 additions & 1 deletion lib/warren/delay_exchange.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ class DelayExchange
# @param channel [Warren::Handler::Broadcast::Channel] A channel on which to register queues
# @param config [Hash] queue configuration hash
#
def initialize(channel:, config:)
def initialize(channel:, config:, channel_factory: nil)
@channel = channel
# Use a factory so DelayExchange can recover from channel closure.
# In Fox recovery, reopen! is called with subscription.channel so
# Subscription and DelayExchange stay on the same channel.
# If no channel is passed, reopen! creates one via channel_factory.
@channel_factory = channel_factory
@exchange_config = config&.fetch('exchange', nil)
@bindings = config&.fetch('bindings', []) || []
end
Expand All @@ -33,6 +38,20 @@ def activate!
establish_bindings!
end

# Rebinds DelayExchange to a healthy channel after broker-side closure.
# Resets the memoized @exchange because exchange instances are tied to
# the old channel and must be rebuilt on the replacement channel.
#
# @param channel [Warren::Handler::Broadcast::Channel, nil]
# Optional channel to reuse instead of creating a new one.
def reopen!(channel: nil)
raise StandardError, 'No channel factory configured' unless @channel_factory || channel

@channel = channel || @channel_factory.call
@exchange = nil
self
end

#
# Post a message to the delay exchange.
#
Expand Down
14 changes: 7 additions & 7 deletions lib/warren/den.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def consumer_config
#
# @return [Warren::Fox]
def spawn_fox
# We don't use with_channel as our consumer persists outside the block,
# and while we *can* share channels between consumers it results in them
# sharing the same worker pool. This process lets us control workers on
# a per-queue basis. Currently that just means one worker per consumer.
channel = Warren.handler.new_channel(worker_count: worker_count)
subscription = Warren::Subscription.new(channel: channel, config: queue_config)
delay = Warren::DelayExchange.new(channel: channel, config: delay_config)
# A fox is long-lived, so it needs its own channel instance.
# Use `channel_factory` so recovery can create a fresh channel if
# RabbitMQ closes the current one.
channel_factory = -> { Warren.handler.new_channel(worker_count: worker_count) }
channel = channel_factory.call
subscription = Warren::Subscription.new(channel: channel, config: queue_config, channel_factory: channel_factory)
delay = Warren::DelayExchange.new(channel: channel, config: delay_config, channel_factory: channel_factory)
Warren::Fox.new(name: @app_name,
subscription: subscription,
adaptor: @adaptor,
Expand Down
12 changes: 9 additions & 3 deletions lib/warren/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
module Warren
# Exceptions used by the warren gem
module Exceptions
# Top level error class for Warren exceptions.
class Error < StandardError; end

# raise {Warren::Exceptions::TemporaryIssue} in a {Warren::Subscriber} to
# nack the message, requeuing it, and sending the consumers into sleep
# mode until the issue resolves itself.
TemporaryIssue = Class.new(StandardError)
class TemporaryIssue < Error; end

# {Warren::Exceptions::Exceptions::MultipleAcknowledgements} is raised if a message
# {Warren::Exceptions::MultipleAcknowledgements} is raised if a message
# is acknowledged, or rejected (nacked) multiple times.
MultipleAcknowledgements = Class.new(StandardError)
class MultipleAcknowledgements < Error; end

# Raised when the Bunny session cannot be started.
class SessionStartError < Error; end
end
end
224 changes: 206 additions & 18 deletions lib/warren/fox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'forwardable'
require 'bunny'
require 'digest'
require 'warren'
require 'warren/helpers/state_machine'
require 'warren/subscriber/base'
Expand All @@ -11,6 +12,7 @@
module Warren
# A fox is a rabbitMQ consumer. It handles subscription to the queue
# and passing message on to the registered Subscriber
# rubocop:disable Metrics/ClassLength
class Fox
# A little cute fox emoji to easily flag output from the consumers
FOX = '🦊'
Expand Down Expand Up @@ -41,6 +43,9 @@ def initialize(name:, subscription:, adaptor:, subscribed_class:, delayed:)
@adaptor = adaptor
@subscribed_class = subscribed_class
@state = :initialized
# Remember messages that could not be dead-lettered because RabbitMQ had
# already closed the channel, so they can be dead-lettered if redelivered.
@pending_dead_letter_messages = Set.new
end

states :stopping, :stopped, :paused, :starting, :started, :running
Expand Down Expand Up @@ -91,25 +96,33 @@ def pause!
paused!
end

# If the fox is paused, and a recovery attempt is scheduled, will prompt
# the framework adaptor to attempt to recover. (Such as reconnecting to the
# database). If this operation is successful will resubscribe to the queue,
# otherwise a further recovery attempt will be scheduled. Successive recovery
# attempts will be gradually further apart, up to the MAX_RECONNECT_DELAY
# of 5 minutes.
# If the fox is paused and a recovery attempt is due, asks the framework
# adaptor whether recovery has succeeded (such as reconnecting to the
# database). If so, it reopens its subscriptions and resubscribes;
# otherwise a further recovery attempt will be scheduled. Successive
# recovery attempts will be gradually further apart, up to the
# MAX_RECONNECT_DELAY of 5 minutes.
def attempt_recovery
return unless paused? && recovery_due?

warn { "Attempting recovery: #{@recovery_attempts}" }
if recovered?
running!
subscribe!
recover_subscriptions!
else
@recovery_attempts += 1
@recover_at = Time.now + delay_for_attempt
schedule_recovery_attempt
end
end

# Returns whether a Bunny consumer is currently registered for this fox.
#
# Used by the client control loop to detect in-process cases where the fox
# is marked running but has no active consumer.
#
# @return [Boolean]
def consumer_present?
!@consumer.nil?
end

private

# Our consumer operates in another thread. It is non blocking.
Expand All @@ -125,6 +138,9 @@ def subscribe!
def unsubscribe!
info { 'Unsubscribing' }
@consumer&.cancel
rescue Bunny::Exception, Timeout::Error => e
warn { "Unsubscribe skipped (channel likely closed): #{e.class}: #{e.message}" }
ensure
@consumer = nil
info { 'Unsubscribed' }
end
Expand All @@ -141,17 +157,188 @@ def recovered?
@adaptor.recovered?
end

# Processes one delivered message.
#
# Flow:
# 1. Build a subscriber message wrapper.
# 2. If payload is marked pending dead-letter, force dead-letter.
# 3. Otherwise run subscriber processing + ack.
# 4. On {Warren::Exceptions::TemporaryIssue}, pause and requeue.
# 5. On any other error, log and dead-letter.
#
# @param delivery_info [Bunny::DeliveryInfo] Delivery metadata from Bunny
# @param properties [Bunny::MessageProperties] Message properties and headers
# @param payload [String] Raw message payload
# @return [void]
def process(delivery_info, properties, payload)
message = @subscribed_class.new(self, delivery_info, properties, payload)

log_message(payload) do
message = @subscribed_class.new(self, delivery_info, properties, payload)
@adaptor.handle { message._process_ }
rescue Warren::Exceptions::TemporaryIssue => e
warn { "Temporary Issue: #{e.message}" }
pause!
message.requeue(e)
rescue StandardError => e
message.dead_letter(e)
handle_message(message, payload)
end
rescue Warren::Exceptions::TemporaryIssue => e
handle_temporary_issue(message, e)
rescue StandardError => e
# If RabbitMQ closes the channel before ack succeeds, that error is
# treated as a message handling failure as well.
handle_message_failure(message, e)
end

# Routes a message through pending-dead-letter handling or normal
# processing, depending on whether it was previously marked as pending.
#
# @param message [Warren::Subscriber::Base] The message wrapper
# @param payload [String] The raw message payload
# @return [void]
def handle_message(message, payload)
return force_pending_dead_letter(message) if pending_dead_letter?(payload)

process_message(message)
end

# Handles temporary processing failures by pausing and requesting requeue.
#
# @param message [Warren::Subscriber::Base] The message wrapper
# @param exception [StandardError] The temporary failure
# @return [void]
def handle_temporary_issue(message, exception)
warn { "Temporary Issue: #{exception.message}" }
pause!
safe_requeue(message, exception)
end

# Handles non-temporary failures by logging and requesting dead-letter.
#
# @param message [Warren::Subscriber::Base] The message wrapper
# @param exception [StandardError] The failure that occurred
# @return [void]
def handle_message_failure(message, exception)
error { "Message handling failed: #{exception.class}: #{exception.message}" }
debug { exception.backtrace.join("\n") } if exception.backtrace
safe_dead_letter(message, exception)
end

# Returns whether the payload is marked as pending dead-letter.
#
# @param payload [String] The raw message payload
# @return [Boolean]
def pending_dead_letter?(payload)
@pending_dead_letter_messages.include?(compute_message_identifier(payload))
end

# Forces dead-letter for a message previously marked as pending.
#
# @param message [Warren::Subscriber::Base] The message wrapper
# @return [void]
def force_pending_dead_letter(message)
warn { 'Re-processing pending dead-letter message after channel re-established; forcing dead-letter' }
safe_dead_letter(message, StandardError.new('Forced dead-letter after prior channel-closed nack failure'))
end

# Runs subscriber processing and final ack via the framework adaptor.
#
# @param message [Warren::Subscriber::Base] The message wrapper
# @return [void]
def process_message(message)
@adaptor.handle { message._process_ }
end

# Requeues a message after a temporary failure.
#
# If RabbitMQ has already closed the channel, Bunny can raise while
# requeueing. In that case we log the failure and let broker redelivery
# happen after recovery instead of crashing the consumer.
#
# @param message [Warren::Subscriber::Base] The message wrapper to requeue
# @param exception [StandardError] The exception that triggered requeueing
# @return [void]
def safe_requeue(message, exception)
message.requeue(exception)
rescue Bunny::Exception, Timeout::Error => e
warn { "Requeue failed (channel likely closed): #{e.class}: #{e.message}" }
end

# Dead-letters a message after a permanent failure.
#
# If RabbitMQ has already closed the channel, Bunny can raise while
# dead-lettering. In that case we remember the message so it can be
# forced to dead-letter if the broker redelivers it after recovery.
#
# @param message [Warren::Subscriber::Base] The message wrapper to dead-letter
# @param exception [StandardError] The exception that triggered dead-lettering
# @return [void]
def safe_dead_letter(message, exception)
message_identifier = compute_message_identifier(message.payload)
message.dead_letter(exception)
# Dead-letter succeeded, remove from pending if present
@pending_dead_letter_messages.delete(message_identifier)
rescue Bunny::Exception, Timeout::Error => e
message_identifier = compute_message_identifier(message.payload)
warn { "Dead-letter failed (channel likely closed): #{e.class}: #{e.message}" }
@pending_dead_letter_messages.add(message_identifier)
pause!
end

# Computes a stable identifier for a message payload.
#
# Used to track messages whose dead-letter operation failed because the
# channel had already closed, so they can be recognized if redelivered.
#
# @param payload [String] The raw message payload
# @return [String] SHA256 digest for the payload
def compute_message_identifier(payload)
Digest::SHA256.hexdigest(payload)
end

# Rebuilds subscription state after a successful recovery check.
#
# Reopens the subscription and delay exchange on healthy channels,
# re-establishes their bindings, and resubscribes the consumer. If any
# step fails, schedules another recovery attempt.
#
# @return [void]
def recover_subscriptions!
warn { 'Attempting subscription recovery' }
restore_subscriptions!
info { 'Consumer recovered and resubscribed' }
rescue StandardError => e
handle_recovery_failure(e)
end

# Performs the recovery steps required to restore consumption.
#
# Reopens channel-bound objects, reactivates their bindings, transitions
# back to running state, and subscribes a new consumer.
#
# @return [void]
def restore_subscriptions!
subscription.reopen!
delayed.reopen!(channel: subscription.channel)
subscription.activate!
delayed.activate!
running!
subscribe!
end

# Logs a recovery failure and schedules the next retry attempt.
#
# @param exception [StandardError] The recovery failure
# @return [void]
def handle_recovery_failure(exception)
warn { "Recovery failed: #{exception.class}: #{exception.message}" }
debug { exception.backtrace.join("\n") } if exception.backtrace
schedule_recovery_attempt
end

# Schedules the next recovery attempt using exponential backoff.
#
# Increments the attempt counter and sets the next recovery time based on
# {#delay_for_attempt}, capped by {MAX_RECONNECT_DELAY}.
#
# @return [void]
def schedule_recovery_attempt
@recovery_attempts += 1
@recover_at = Time.now + delay_for_attempt
end

def log_message(payload)
Expand All @@ -162,4 +349,5 @@ def log_message(payload)
debug { 'Finished message process' }
end
end
# rubocop:enable Metrics/ClassLength
end
Loading
Loading