Skip to content

mnbuhl/Atomizer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

112 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Atomizer

Break down complexity. Scale with confidence. Atomizer transforms large-scale job processing into atomic, reliable, and easily managed unitsβ€”making distributed systems simple, robust, and a joy to work with.

Overview

Atomizer is a modern, high-performance job scheduling and queueing framework for ASP.NET Core. Built for cloud-native, distributed applications as well as smaller setups and local development, Atomizer provides a powerful yet easy-to-use solution for processing background jobs, handling complex workflows, and scaling your applications effortlessly.

  • Effortless distributed scaling: Atomizer works seamlessly in clustered setups, letting you process jobs across multiple servers for true horizontal scalability.
  • Flexible architecture: Plug in your preferred storage backend, configure multiple queues, and extend with custom drivers or handlers.
  • Reliable and robust: Enjoy graceful shutdowns, automatic retries, and job visibility timeouts to ensure jobs are never lost or duplicated.
  • Developer-friendly: Atomizer integrates with ASP.NET Core DI, logging, and modern C# features, so you can focus on your business logic.

Features

  • ⏰ Recurring Scheduled Jobs β€” Cron-like recurring execution for time-based workflows.
  • πŸš€ Distributed Processing β€” Scale out to as many servers as your storage backend supports; Atomizer coordinates job execution across the cluster.
  • πŸ—„οΈ Multiple Storage Backends β€” Use Entity Framework Core for durable, database-backed queues; in-memory for fast local development & testing; Redis support coming soon.
  • πŸ”€ Multiple Queues β€” Configure independent queues with custom processing options for each workload.
  • 🧩 Extensible Drivers & Handlers β€” Easily add new storage drivers or job handlers; auto-register handlers from assemblies.
  • ♻️ Advanced Retry Policies β€” Automatic, configurable retries to keep your jobs running smoothlyβ€”even when things go wrong.
  • πŸ›‘ Graceful Shutdown β€” Ensure in-flight jobs finish and pending batched jobs are safely released for re-processing during shutdowns.
  • πŸ“¦ Batch Processing β€” Tune throughput with batch size and parallelism settings per queue.
  • ⏳ Visibility Timeout β€” Prevent job duplication by locking jobs during processing.
  • πŸ•’ FIFO Partitioned Processing β€” Guarantee strict in-order, one-at-a-time execution per partition key (e.g. per customer, per entity).
  • πŸ§ͺ In-Memory Driver β€” Perfect for local development and testing; spin up queues instantly with zero setup.
  • πŸ”” ASP.NET Core Integration β€” Works with DI, logging, and modern C# idioms.

Planned Features

  • πŸ“ˆ Dashboard β€” Live monitoring, retry/dead-letter management, and operational insights.
  • ⚑ Redis Driver β€” Lightning-fast, distributed, in-memory queues for massive scale.

Quick Start

Get up and running in minutes:

1. Install the package

// Add Atomizer core and EF Core storage support
 dotnet add package Atomizer
 dotnet add package Atomizer.EntityFrameworkCore

2. Configure Atomizer

Set up Atomizer in your ASP.NET Core project:

builder.Services.AddAtomizer(options =>
{
    // Configure the default queue 
    // (optional, a default queue is created automatically with configuration like below)
    options.AddQueue(QueueKey.Default, queue => 
    {
        queue.DegreeOfParallelism = 4; // Max 4 jobs processed concurrently
        queue.BatchSize = 10; // Retrieve 10 jobs at a time
        queue.VisibilityTimeout = TimeSpan.FromMinutes(5); // Prevent job duplication by "hiding" jobs for 5 minutes while processing
        queue.StorageCheckInterval = TimeSpan.FromSeconds(15); // Poll for new jobs every 15 seconds
    });
    
    // Add more queues as needed
    options.AddQueue("product", queue => 
    {
        queue.DegreeOfParallelism = 2;
        queue.BatchSize = 5;
    });
    
    // Register job handlers automatically
    options.AddHandlersFrom<AssignStockJobHandler>();
    
    // Use EF Core-backed job storage
    options.UseEntityFrameworkCoreStorage<ExampleDbContext>();
});

// Add Atomizer processing services
builder.Services.AddAtomizerProcessing(options =>
{
    options.StartupDelay = TimeSpan.FromSeconds(5); // Delay startup to allow other services to initialize
    options.GracefulShutdownTimeout = TimeSpan.FromSeconds(30); // Allow up to 30 seconds for jobs to finish on shutdown
});

Inside your DbContext:

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.AddAtomizerEntities(schema: "atomizer");
    // ...other model config...
}

Make sure to run migrations to create the necessary tables.

Note:

If using MySql, set schema to 'null' or configure schema behavior in your DbContext as MySql is not compatible with EF Core schemas.

3. Define a Job Handler

Create a handler for your job payload:

public record SendNewsletterCommand(Product Product);

public class SendNewsletterJob(INewsletterService newsletterService, IEmailService emailService)
    : IAtomizerJob<SendNewsletterCommand>
{
    public async Task HandleAsync(SendNewsletterCommand payload, JobContext context)
    {
        var subscribers = await newsletterService.GetSubscribersAsync(payload.Product.CategoryId);
        var emails = new List<Email>();
        
        foreach (var subscriber in subscribers)
        {
            emails.Add(new Email { /* ... */ });
        }
        
        await Task.WhenAll(emails.ConvertAll(email => emailService.SendEmailAsync(email)));
    }
}

4. Enqueue or schedule a Job

Add jobs to the queue from your application code:

app.MapPost(
    "/products",
    async ([FromServices] IAtomizerClient atomizerClient, [FromServices] ExampleDbContext dbContext) =>
    {
        var product = new Product { /* ... */, CategoryId = Guid.NewGuid() };
        dbContext.Products.Add(product);
        await dbContext.SaveChangesAsync();
        
        await atomizerClient.EnqueueAsync(new SendNewsletterCommand(product));

        return Results.Created($"/products/{product.Id}", product);
    }
);

5. FIFO Processing (Partitioned Jobs)

To guarantee jobs for the same entity execute one-at-a-time in enqueue order, assign a PartitionKey:

// All stock events for the same product are processed in strict FIFO order.
await atomizerClient.EnqueueAsync(
    new StockEvent(productId, "restock", delta: 50),
    options => options.PartitionKey = new PartitionKey(productId.ToString())
);

Jobs sharing the same PartitionKey and queue are serialized: the next job in the partition only starts after the previous one completes (or fails and is rescheduled). Unpartitioned jobs in the same queue are unaffected and continue to process in parallel.

6. Schedule Recurring Jobs

in Program.cs:

...
var app = builder.Build();

var atomizer = app.Services.GetRequiredService<IAtomizerClient>();

await atomizer.ScheduleRecurringAsync(
    new LoggerJobPayload("Recurring job started", LogLevel.Information),
    "LoggerJob",
    Schedule.EveryMinute
);

...

Contributing

  1. Fork the repository.
  2. Create a new branch (feature/xyz).
  3. Commit your changes with clear messages.
  4. Submit a PR with details of your changes and test coverage.

License

This project is licensed under the MIT License. See the LICENSE file for details

About

Atomizer is a modern, high-performance job scheduling and queueing framework for ASP.NET Core. Built for cloud-native, distributed applications

Topics

Resources

License

Stars

Watchers

Forks

Contributors

Languages