Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

* Raise ActiveRecord error if Job failed to be queued
* Renamed `Disqualified.server_options` to `Disqualified.config`
* Renamed `Disqualified.configure_server` to `Disqualified.configure`
* Renamed configuration options

## v0.3.0 - 2023-04-16

Expand Down
5 changes: 5 additions & 0 deletions app/models/disqualified/internal.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# typed: strict

class Disqualified::Internal < Disqualified::BaseRecord
self.table_name = "disqualified_internals"
end
8 changes: 8 additions & 0 deletions app/models/disqualified/record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class Disqualified::Record < Disqualified::BaseRecord

self.table_name = "disqualified_jobs"

serialize :metadata, JSON

scope :runnable, -> { where(finished_at: nil, run_at: (..Time.now), locked_by: nil) }

sig do
Expand Down Expand Up @@ -72,6 +74,12 @@ def run!

sig { void }
def finish
Kernel.catch(:abort) do
Disqualified.config.plugins.sorted_plugins.each do |plugin|
plugin.before_finish(record: self)
end
end

update!(locked_by: nil, locked_at: nil, finished_at: Time.now)
end

Expand Down
7 changes: 6 additions & 1 deletion lib/disqualified.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Disqualified
end

require "optparse"
require "tsort"

require "concurrent"
require "rails"
Expand All @@ -12,9 +13,13 @@ module Disqualified
require_relative "disqualified/error"
require_relative "disqualified/logging"

require_relative "disqualified/configuration"
require_relative "disqualified/engine"
require_relative "disqualified/job"
require_relative "disqualified/main"
require_relative "disqualified/pool"
require_relative "disqualified/server_configuration"
require_relative "disqualified/options"
require_relative "disqualified/plugin"
require_relative "disqualified/plugin_registry"
require_relative "disqualified/unique"
require_relative "disqualified/version"
28 changes: 14 additions & 14 deletions lib/disqualified/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def self.run

class ServerEngine < Rails::Engine
config.before_initialize do
Disqualified.server_options = Disqualified::ServerConfiguration.new
Disqualified.config = Disqualified::Configuration.new
end
end

Expand All @@ -27,11 +27,11 @@ def run

option_parser.parse(@original_argv)

server_options = T.must(Disqualified.server_options)
delay_range = server_options.delay_range
error_hooks = server_options.error_hooks
logger = server_options.logger
pool_size = server_options.pool_size
config = Disqualified.config
delay_range = config.delay_range
error_hooks = config.execution_error_hooks
logger = config.logger
pool_size = config.pool_size

# standard:disable Style/StringLiterals
logger.info { ' ____ _ ___ _____ __' }
Expand All @@ -41,7 +41,7 @@ def run
logger.info { '/_____/_/____/\__, /\__,_/\__,_/_/_/_/ /_/\___/\__,_/' }
logger.info { ' /_/' + "v#{Disqualified::VERSION}".rjust(32, " ") }
# standard:enable Style/StringLiterals
logger.info { Disqualified.server_options.to_s }
logger.info { Disqualified.config.to_s }

pool = Disqualified::Pool.new(delay_range:, pool_size:, error_hooks:, logger:) do |args|
args => {promise_index:}
Expand All @@ -64,18 +64,18 @@ def option_parser
option_parser = OptionParser.new do |opts|
opts.banner = "Usage: #{File.basename($0)} [OPTIONS]"

server_options = T.must(Disqualified.server_options)
config = Disqualified.config

opts.on("--delay-low SECONDS", Numeric, "Default: #{server_options.delay_low}") do |value|
server_options.delay_low = value
opts.on("--poll-low SECONDS", Numeric, "Default: #{config.poll_low}") do |value|
config.poll_low = value
end

opts.on("--delay-high SECONDS", Numeric, "Default: #{server_options.delay_high}") do |value|
server_options.delay_high = value
opts.on("--poll-high SECONDS", Numeric, "Default: #{config.poll_high}") do |value|
config.poll_high = value
end

opts.on("--pool COUNT", Integer, "Default: #{server_options.pool_size}") do |value|
server_options.pool_size = value
opts.on("--pool COUNT", Integer, "Default: #{config.pool_size}") do |value|
config.pool_size = value
end

opts.on("-h", "--help", "Prints this help") do
Expand Down
72 changes: 72 additions & 0 deletions lib/disqualified/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# typed: strict

module Disqualified
extend T::Sig

class << self
extend T::Sig

sig { params(config: Disqualified::Configuration).returns(Disqualified::Configuration) }
attr_writer :config

sig { returns(Disqualified::Configuration) }
def config
@config ||= T.let(Disqualified::Configuration.new, T.nilable(Disqualified::Configuration))
end

sig { params(block: T.proc.params(arg0: Disqualified::Configuration).void).void }
def configure(&block)
block.call(config)
end
end
end

class Disqualified::Configuration
extend T::Sig

sig { void }
def initialize
@poll_high = T.let(5.0, Numeric)
@poll_low = T.let(1.0, Numeric)
@logger = T.let(Rails.logger, T.untyped)
@pool_size = T.let(5, Integer)
@pwd = T.let(Dir.pwd, String)
@execution_error_hooks = T.let([], T::Array[Disqualified::Logging::ERROR_HOOK_TYPE])
@plugins = T.let(Disqualified::PluginRegistry.new, Disqualified::PluginRegistry)

plugins.register(Disqualified::Unique::Plugin.new)
end

sig { returns(Numeric) }
attr_accessor :poll_high
sig { returns(Numeric) }
attr_accessor :poll_low
sig { returns(T::Array[Disqualified::Logging::ERROR_HOOK_TYPE]) }
attr_accessor :execution_error_hooks
sig { returns(T.untyped) }
attr_accessor :logger
sig { returns(Integer) }
attr_accessor :pool_size
sig { returns(String) }
attr_accessor :pwd
sig { returns(Disqualified::PluginRegistry) }
attr_accessor :plugins

private :execution_error_hooks=
private :plugins=

sig { returns(T::Range[Float]) }
def delay_range
poll_low.to_f..poll_high.to_f
end

sig { params(block: Disqualified::Logging::ERROR_HOOK_TYPE).void }
def on_execution_error(&block)
execution_error_hooks.push(block)
end

sig { returns(String) }
def to_s
"{ delay: #{delay_range}, pool_size: #{pool_size}, error_hooks_size: #{execution_error_hooks.size} }"
end
end
3 changes: 3 additions & 0 deletions lib/disqualified/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ class JobAlreadyFinished < DisqualifiedError

class JobNotClaimed < DisqualifiedError
end

class DuplicateSetting < DisqualifiedError
end
end
end
33 changes: 32 additions & 1 deletion lib/disqualified/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,44 @@ module Disqualified::Job
module ClassMethods
extend T::Sig

sig { returns(Disqualified::Options) }
private def job_options
@job_options ||= T.let(Disqualified::Options.new, T.nilable(Disqualified::Options))
end

sig { params(till: Symbol, including: Symbol).void }
private def unique(till = :until_executed, including: :arguments)
if job_options.key?("unique")
Kernel.raise Disqualified::Error::DuplicateSetting, "`unique` called more than once"
end

job_options["unique"] = {
"till" => till,
"including" => including,
"handler" => T.unsafe(self).name
}
end

sig { params(the_time: T.any(Time, Date, ActiveSupport::TimeWithZone), args: T.untyped).void }
def perform_at(the_time, *args)
metadata = {}
before_queue_completed = T.let(false, T::Boolean)

Kernel.catch(:abort) do
Disqualified.config.plugins.sorted_plugins.each do |plugin|
plugin.before_queue(metadata:, job_options:, arguments: args)
end
before_queue_completed = true
end

return unless before_queue_completed

Disqualified::Record.create!(
handler: T.unsafe(self).name,
arguments: JSON.dump(args),
queue: "default",
run_at: the_time
run_at: the_time,
metadata:
)
end

Expand Down
34 changes: 34 additions & 0 deletions lib/disqualified/options.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# typed: strict

class Disqualified::Options
extend T::Sig

Scalar = T.type_alias { T.any(Integer, String, T::Boolean) }
FlatHash = T.type_alias { T::Hash[String, Scalar] }
Value = T.type_alias { T.nilable(T.any(Scalar, FlatHash)) }

sig { void }
def initialize
@data = T.let({}, T::Hash[String, Value])
end

sig { params(key: String).returns(Value) }
def [](key)
@data[key]
end

sig { params(key: String, value: Value).returns(Value) }
def []=(key, value)
@data[key] = value
end

sig { params(key: String).returns(T::Boolean) }
def key?(key)
@data.key?(key)
end

sig { returns(T::Hash[String, Value]) }
private def to_h
@data
end
end
39 changes: 39 additions & 0 deletions lib/disqualified/plugin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# typed: strict

module Disqualified::Plugin
extend T::Sig
extend T::Helpers
abstract!

sig { abstract.returns(String) }
def name
end

sig { abstract.returns(String) }
def job_config_namespace
end

sig { abstract.returns(String) }
def metadata_namespace
end

sig { overridable.void }
def on_registry
end

sig do
overridable
.params(
metadata: T.untyped,
job_options: Disqualified::Options,
arguments: T.untyped
)
.void
end
def before_queue(metadata:, job_options:, arguments:)
end

sig { overridable.params(record: Disqualified::Record).void }
def before_finish(record:)
end
end
60 changes: 60 additions & 0 deletions lib/disqualified/plugin_registry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# typed: strict

class Disqualified::PluginRegistry
include TSort
extend T::Sig

sig { void }
def initialize
@ordering = T.let({}, T::Hash[String, T::Array[String]])
@registered = T.let({}, T::Hash[String, Disqualified::Plugin])
end

ONE_PLUS_PLUGINS = T.type_alias { T.any(String, T::Array[String]) }

sig { params(plugin: Disqualified::Plugin, after: ONE_PLUS_PLUGINS, before: ONE_PLUS_PLUGINS).void }
def register(plugin, after: [], before: [])
@registered[plugin.name] = plugin
order(plugin.name, after:, before:)
end

sig { params(plugin_name: String, after: ONE_PLUS_PLUGINS, before: ONE_PLUS_PLUGINS).void }
def order(plugin_name, after: [], before: [])
@ordering[plugin_name] ||= []
@ordering[plugin_name].push(*after)
[].push(*before).each do |dependant|
@ordering[dependant] ||= []
T.must(@ordering[dependant]).push(plugin_name)
end
end

sig { params(block: T.proc.params(arg0: String).void).void }
def tsort_each_node(&block)
@ordering.each do |key, _|
next unless @registered.key?(key)
block.call(key)
end
end

sig { params(node: String, block: T.proc.params(arg0: String).void).void }
def tsort_each_child(node, &block)
@ordering.fetch(node).each do |dependency|
next unless @registered.key?(dependency)
block.call(dependency)
end
end

sig { returns(T::Array[String]) }
def sorted
tsort
end

sig { returns(T::Array[Disqualified::Plugin]) }
def sorted_plugins
tsort.map do |plugin_name|
@registered.fetch(plugin_name)
end
end

private :tsort
end
Loading