Skip to content

amkisko/good_job.ex

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

47 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

good_job for elixir

Hex Version Hex Docs Test Status codecov

Concurrent, Postgres-based job queue backend for Elixir. Provides attribute-based job execution with PostgreSQL advisory locks to ensure run-once safety. Works with Phoenix and can be used standalone in other Elixir frameworks or plain Elixir applications.

Port of GoodJob - This Elixir implementation is a port of the excellent Ruby GoodJob gem by Ben Sheldon, designed for maximum compatibility with the original, to make it possible running both Ruby and Elixir applications with the same database. It fully implements the protocol that respects GoodJob and ActiveJob conventions. This implementation allows moving forward to other languages and frameworks that implement the same protocol.

Need Ruby compatibility details? See COMPATIBILITY.md for compatibility information.

Migrating from the Ruby version? See MIGRATION_FROM_RUBY.md for a detailed guide.

Using without Phoenix? See STANDALONE.md for standalone usage.

Features

  • PostgreSQL Backend - Relies upon Postgres integrity with advisory locks (transaction-level for job claims, session-level for process heartbeat) to provide run-once safety
  • LISTEN/NOTIFY - Uses PostgreSQL LISTEN/NOTIFY to reduce queuing latency
  • Multiple Execution Modes - Inline (testing), async (development), external (production)
  • Queue Management - Support for ordered queues, queue-specific concurrency, and semicolon-separated pools
  • Cron Jobs - Scheduled jobs with cron expressions
  • Batch Operations - Batch job tracking and callbacks
  • Concurrency Controls - Per-key concurrency limits and throttling
  • Retry Mechanisms - Automatic retries with exponential backoff
  • Plugins System - Extensible plugin architecture for custom functionality
  • Labels/Tags - Tag jobs for filtering and analytics
  • Web Dashboard - Phoenix LiveView dashboard for monitoring and management
  • Ruby-Compatible - Fully aligned with Ruby GoodJob configuration and database schema
  • Comprehensive Instrumentation - Telemetry events for monitoring and metrics
  • Production Ready - Designed for applications that enqueue 1-million jobs/day and more

Installation

Add good_job to your list of dependencies in mix.exs:

def deps do
  [
    {:good_job, "~> 0.1.1"}
  ]
end

For Phoenix LiveView dashboard support, also ensure you have:

{:phoenix_live_view, "~> 1.1"}

Quick Start

1. Install the Database Migrations

mix good_job.install
mix ecto.migrate

2. Configure GoodJob

# config/config.exs
config :good_job,
  repo: MyApp.Repo,
  execution_mode: :external,  # :inline (test), :async (dev), :external (prod)
  queues: "*",
  max_processes: 5

3. Start GoodJob in Your Application

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.Repo,
      GoodJob.Application
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

4. Define and Enqueue a Job

defmodule MyApp.MyJob do
  use GoodJob.Job

  @impl GoodJob.Behaviour
  def perform(%{data: data}) do
    # Your job logic here
    IO.puts("Processing: #{inspect(data)}")
    :ok
  end
end

# Enqueue the job
MyApp.MyJob.enqueue(%{data: "hello"})

Usage

Basic Job

defmodule MyApp.EmailJob do
  use GoodJob.Job, queue: "emails", priority: 1

  @impl GoodJob.Behaviour
  def perform(%{to: to, subject: subject, body: body}) do
    MyApp.Mailer.send(to: to, subject: subject, body: body)
    :ok
  end
end

MyApp.EmailJob.enqueue(%{to: "user@example.com", subject: "Hello", body: "World"})

Labeled Jobs (Tags)

defmodule MyApp.TaggedJob do
  use GoodJob.Job, tags: ["billing", "priority"]

  @impl GoodJob.Behaviour
  def perform(_args), do: :ok
end

MyApp.TaggedJob.enqueue(%{user_id: 123}, tags: ["vip"])

Job with Retries

defmodule MyApp.ApiJob do
  use GoodJob.Job, max_attempts: 10

  @impl GoodJob.Behaviour
  def perform(%{url: url}) do
    case HTTPoison.get(url) do
      {:ok, response} -> {:ok, response.body}
      {:error, reason} -> {:error, reason}  # Will retry
    end
  end

  def backoff(attempt) do
    GoodJob.Backoff.exponential(attempt, max: 300)
  end
end

Cron Jobs

# config/config.exs
config :good_job,
  enable_cron: true,
  cron: %{
    cleanup: %{
      cron: "0 2 * * *",  # Every day at 2 AM
      class: MyApp.CleanupJob,
      args: %{},
      queue: "default"
    }
  }

Batch Jobs

batch = GoodJob.Batch.create(%{
  description: "Process users",
  on_finish: "MyApp.BatchFinishedJob"
})

User
|> Repo.all()
|> Enum.each(fn user ->
  ProcessUserJob.enqueue(%{user_id: user.id}, batch_id: batch.id)
end)

Concurrency Controls

defmodule MyApp.UserJob do
  use GoodJob.Job

  @impl GoodJob.Behaviour
  def perform(%{user_id: user_id}) do
    # Process user
  end

  def good_job_concurrency_config do
    [
      key: fn %{user_id: user_id} -> "user_#{user_id}" end,
      limit: 5,
      perform_throttle: {10, 60} # max 10 executions per 60s for the key
    ]
  end
end

Throttling Only (No Concurrency Limit)

defmodule MyApp.ThrottledJob do
  use GoodJob.Job

  @impl GoodJob.Behaviour
  def perform(_args), do: :ok

  def good_job_concurrency_config do
    [
      key: fn _args -> "global" end,
      enqueue_throttle: {100, 60}
    ]
  end
end

Bulk Enqueue

You can buffer and insert multiple jobs atomically with GoodJob.Bulk:

{:ok, jobs} =
  GoodJob.Bulk.enqueue(fn ->
    MyApp.EmailJob.perform_later(%{user_id: 1})
    MyApp.EmailJob.perform_later(%{user_id: 2})
  end)

length(jobs)
#=> 2

You can also enqueue job instances directly:

jobs = [
  MyApp.EmailJob.new(%{user_id: 1}),
  MyApp.EmailJob.new(%{user_id: 2}, queue: "mailers")
]

{:ok, _inserted} = GoodJob.Bulk.enqueue(jobs)

Queue Configuration

# Process all queues
queues: "*"

# Comma-separated queues (legacy format)
queues: "queue1:5,queue2:10"

# Semicolon-separated pools (Ruby GoodJob format)
queues: "queue1:2;queue2:1;*"

# Ordered queues (process in order)
queues: "+queue1,queue2:5"

# Excluded queues
queues: "-queue1,queue2:2"

Note: Only * is supported as a wildcard (standalone, not in patterns like queue*).

Execution Modes

  • :inline - Execute immediately in current process (test/dev only)
  • :async - Execute in processes within web server process only
  • :external - Enqueue only, requires separate worker process (production default)

Configuration

# config/config.exs
config :good_job,
  repo: MyApp.Repo,
  execution_mode: :external,
  queues: "*",
  max_processes: 5,
  poll_interval: 10,
  enable_listen_notify: true,
  enable_cron: false,
  cleanup_discarded_jobs: true,
  cleanup_preserved_jobs_before_seconds_ago: 1_209_600, # 14 days
  cleanup_preserved_jobs_max_count: 1_000,
  advisory_lock_function: :pg_try_advisory_xact_lock,
  advisory_lock_hash_algorithm: :md5

See config/prod.exs.example for a complete configuration example with all available options.

Stale locks and enqueue concurrency retries

  • stale_lock_release_after_seconds (default 60) and GOOD_JOB_STALE_LOCK_RELEASE_AFTER_SECONDS control how long a good_jobs row may stay locked before the worker’s periodic sweep clears locked_by_id / locked_at / performed_at for rows that look abandoned. Increase this if your jobs routinely run longer than the default window while holding the row lock.

  • GoodJob.enqueue/3 with a concurrency_key runs a concurrency limit check inside an Ecto Repo transaction (transaction/1 callback). If another connection holds the per-key advisory lock, the check can return {:ok, {:error, :lock_failed}} (Ecto’s arity-0 transaction wrapper). GoodJob retries that case a few times with a short sleep so transient contention does not fail the enqueue immediately.

Advisory Lock Configuration

  • :advisory_lock_function controls advisory lock acquisition for transactional lock paths (job claims and concurrency checks). Default: :pg_try_advisory_xact_lock.
  • :advisory_lock_hash_algorithm controls lock-key derivation strategy. Default: :md5. Supported: :md5, :sha1, :sha224, :sha256, :sha384, :sha512, :hashtextextended, :hashtext, :uuid_v5.
  • Session-level locks for process heartbeat use pg_try_advisory_lock.

Environment variables:

  • GOOD_JOB_ADVISORY_LOCK_FUNCTION
  • GOOD_JOB_ADVISORY_LOCK_HASH_ALGORITHM

Notes:

  • :hybrid dequeue embeds a fixed MD5-based advisory lock expression in SQL for candidate rows. It does not use :advisory_lock_hash_algorithm (that setting applies to Elixir AdvisoryLock and :advisory claims). Use one :lock_strategy per database.

  • hashtextextended requires PostgreSQL 11+.

  • hashtext is available in all supported PostgreSQL versions (and documented at least since PostgreSQL 9.6).

  • sha* strategies require PostgreSQL pgcrypto (digest()).

  • uuid_v5 requires PostgreSQL uuid-ossp (uuid_generate_v5()).

Web Dashboard

Phoenix LiveDashboard Integration (Recommended)

# lib/my_app_web/router.ex
import Phoenix.LiveDashboard.Router

live_dashboard "/dashboard",
  metrics: MyAppWeb.Telemetry,
  additional_pages: [
    good_job: GoodJob.Web.LiveDashboardPage
  ]

Standalone Dashboard

# lib/my_app_web/router.ex
scope "/good_job" do
  pipe_through :browser
  live "/", GoodJob.Web.LiveDashboard, :index
end

Note: The web dashboard requires Phoenix. For monitoring without Phoenix, see STANDALONE.md.

Testing

# config/test.exs
config :good_job,
  execution_mode: :inline

# In your tests
import GoodJob.Testing

test "job is enqueued" do
  MyApp.MyJob.enqueue(%{data: "test"})
  assert_enqueued(MyApp.MyJob, %{data: "test"})
end

Requirements

  • Elixir >= 1.19
  • PostgreSQL >= 12
  • Ecto >= 3.0
  • Phoenix >= 1.7 (optional, for Phoenix integration)
  • Phoenix LiveView >= 1.1 (optional, for LiveView dashboard)

Note: GoodJob can be used without Phoenix! See STANDALONE.md.

Examples

Complete working examples are available in the examples/ directory:

  • habit_tracker - A full Phoenix application demonstrating GoodJob integration with LiveView dashboard, cron jobs, and batch operations
  • monorepo_example - A monorepo setup showing Ruby and Elixir applications sharing the same GoodJob database

See examples/README.md for more details.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/amkisko/good_job.ex

See CONTRIBUTING.md for guidelines.

Credits

This Elixir implementation is a port of GoodJob by Ben Sheldon. We are grateful for the excellent design and implementation of the original Ruby version, which served as the foundation for this port.

License

The library is available as open source under the terms of the MIT License.

About

Concurrent, PostgreSQL-based job queue for Elixir. Port of Ruby GoodJob with full compatibility.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Sponsor this project

Contributors

Languages