diff --git a/lib/resque-ext/job.rb b/lib/resque-ext/job.rb index 9da81ed..bc40f77 100644 --- a/lib/resque-ext/job.rb +++ b/lib/resque-ext/job.rb @@ -13,11 +13,23 @@ def self.create_with_loner(queue, klass, *args) return create_without_loner(queue, klass, *args) if Resque.inline? item = { class: klass.to_s, args: args } return 'EXISTED' if Resque::Plugins::Loner::Helpers.loner_queued?(queue, item) + + # Guard against nested MULTI when called from inside resque-scheduler's own multi block. + # In redis 4.x, @client is replaced with a Pipeline object (responds to :futures) during + # a multi block. Opening another multi here would merge a second MULTI command into the + # outer pipeline, causing ERR MULTI calls can not be nested from Redis. + # Instead, call directly — both commands flow through DeprecatedMulti into the outer pipeline. + if Resque.redis.redis.instance_variable_get(:@client).respond_to?(:futures) + create_return_value = create_without_loner(queue, klass, *args) + Resque::Plugins::Loner::Helpers.mark_loner_as_queued(queue, item) + return create_return_value + end + # multi block returns array of keys create_return_value = false - Resque.redis.multi do + Resque.redis.multi do |pipeline| create_return_value = create_without_loner(queue, klass, *args) - Resque::Plugins::Loner::Helpers.mark_loner_as_queued(queue, item) + Resque::Plugins::Loner::Helpers.mark_loner_as_queued(queue, item, pipeline) end create_return_value end diff --git a/lib/resque-ext/resque.rb b/lib/resque-ext/resque.rb index 4e7f4a8..9da914e 100644 --- a/lib/resque-ext/resque.rb +++ b/lib/resque-ext/resque.rb @@ -1,4 +1,30 @@ module Resque + class << self + alias_method :original_enqueue_to, :enqueue_to + + # Patch enqueue_to to return Job.create result instead of always returning true + # This allows resque-loner's "EXISTED" return value to propagate back to the caller + # Required for Resque 2.x compatibility where enqueue_to ignores Job.create result + def enqueue_to(queue, klass, *args) + # Perform before_enqueue hooks. Don't perform enqueue if any hook returns false + before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook| + klass.send(hook, *args) + end + return nil if before_hooks.any? { |result| result == false } + + # Capture and return Job.create result (allows "EXISTED" to propagate) + result = Job.create(queue, klass, *args) + + Plugin.after_enqueue_hooks(klass).each do |hook| + klass.send(hook, *args) + end + + # Return actual result instead of always true + # For resque-loner: returns "EXISTED" if duplicate, otherwise truthy value + result.nil? ? true : result + end + end + def self.enqueued?(klass, *args) enqueued_in?(queue_from_class(klass), klass, *args) end diff --git a/lib/resque-loner/helpers.rb b/lib/resque-loner/helpers.rb index 11d0426..209343a 100644 --- a/lib/resque-loner/helpers.rb +++ b/lib/resque-loner/helpers.rb @@ -11,12 +11,12 @@ def self.loner_queued?(queue, item) redis.get(unique_job_queue_key(queue, item)) == '1' end - def self.mark_loner_as_queued(queue, item) + def self.mark_loner_as_queued(queue, item, pipeline = nil) return unless item_is_a_unique_job?(item) key = unique_job_queue_key(queue, item) - redis.set(key, 1) + (pipeline || redis).set(key, 1) unless (ttl = item_ttl(item)) == -1 # no need to incur overhead for default value - redis.expire(key, ttl) + (pipeline || redis).expire(key, ttl) end end diff --git a/lib/resque-loner/version.rb b/lib/resque-loner/version.rb index affafda..dc039da 100644 --- a/lib/resque-loner/version.rb +++ b/lib/resque-loner/version.rb @@ -1,7 +1,7 @@ module Resque module Plugins module Loner - VERSION = '1.3.0' + VERSION = '1.3.2' end end end