Skip to content

feat: FIFO partition support for job queues#14

Merged
mnbuhl merged 61 commits intomainfrom
feature/fifo
May 4, 2026
Merged

feat: FIFO partition support for job queues#14
mnbuhl merged 61 commits intomainfrom
feature/fifo

Conversation

@mnbuhl
Copy link
Copy Markdown
Owner

@mnbuhl mnbuhl commented May 4, 2026

Summary

Adds first-class FIFO partition support to Atomizer's job model, public API, and storage contract. Jobs can now be assigned a PartitionKey so all jobs sharing the same key within a queue are processed strictly in order, with at most one job per partition in flight at any time.

  • PartitionKey value object — validated, immutable key scoped to (queue, partition); throws InvalidPartitionKeyException on invalid input
  • Domain model updatesAtomizerJob and AtomizerSchedule carry PartitionKey, SequenceNumber, and IsPartitionBlocked; EnqueueOptions / RecurringOptions expose the new field to callers
  • IAtomizerClientEnqueueAsync and ScheduleRecurringAsync accept an optional partitionKey argument; no breaking change for existing callers
  • IAtomizerStorage FIFO contract — formal <remarks> on InsertAsync (monotonic SequenceNumber assignment) and GetDueJobsAsync (three-rule partition-blocking filter)
  • AtomizerStorageContractTests — abstract xUnit v3 test base (9 [Fact] methods) covering FIFO-07/08/09; Phase 8 (InMemory) and Phase 9 (EF Core) each add one concrete subclass to prove compliance

What's not in this PR

Concrete storage implementations (InMemory FIFO filter, EF Core FIFO SQL) are in the next two phases. The contract test base will have failing tests until those implementations land.

Test plan

  • dotnet build — 0 errors across all target frameworks
  • dotnet test tests/Atomizer.Tests/ — 93 tests passing (net8.0 + net10.0)
  • PartitionKey unit tests: valid/invalid construction, equality, length boundary
  • AtomizerJob partition tests: PartitionKey, SequenceNumber, IsPartitionBlocked property behaviour
  • AtomizerStorageContractTests compiles as abstract base without a concrete subclass

🤖 Generated with Claude Code

mnbuhl and others added 30 commits May 4, 2026 09:04
…eyException

- InvalidPartitionKeyExceptionTests: 5 cases covering all 4 constructors and ArgumentException inheritance
- PartitionKeyTests: 11 cases covering valid/invalid inputs, implicit conversions, equality
…eption

- InvalidPartitionKeyException derives from ArgumentException with all 4 constructors and XML docs
- PartitionKey sealed value object: 255-char cap, IsNullOrWhiteSpace guard, implicit string conversions, ValueObject equality
- Both files follow exact structural analog of JobKey / InvalidJobKeyException
- Add PartitionKey? PartitionKey to EnqueueOptions with XML docs
- Add PartitionKey? PartitionKey to RecurringOptions with XML docs
…AtomizerJob

- Add PartitionKey? PartitionKey property with XML docs
- Add long? SequenceNumber property with XML docs
- Add IsPartitionBlocked computed property (blocks when Processing or Pending+retrying)
- Extend Create() with PartitionKey? partitionKey = null parameter
- Initialize PartitionKey and SequenceNumber = null in Create() object initializer
- Add PartitionKey? PartitionKey property with XML docs
- Extend Create() with PartitionKey? partitionKey = null parameter
- Initialize PartitionKey = partitionKey in Create() object initializer
- Pass partitionKey: options.PartitionKey (named arg) to AtomizerJob.Create() in EnqueueInternalAsync
- Pass options.PartitionKey as last positional arg to AtomizerSchedule.Create() in ScheduleRecurringAsync
- 11 PartitionKeyTests already provided by Plan 01 (PartitionKeyTests.cs)
- 4 AtomizerJobPartitionTests: null partition → false, Pending+0 → false,
  Processing → true, Pending+Attempts>0 → true
- All 15 targeted tests pass; full suite 93 passed on net8.0 and net10.0
Add job.Release and UpdateJobsAsync in the OperationCanceledException
catch branch so partitioned jobs unblock their partition immediately
instead of waiting for the full visibility timeout to expire.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Apply the mandatory project convention that all public implementation
classes are sealed. Both types were introduced in this phase without
the sealed modifier.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add Processing status pre-condition checks matching the guards already
present on Lease, Release, and Attempt, preventing double-complete,
double-fail, and completing/failing a job that was never leased.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace job.PayloadType!.FullName with job.PayloadType?.FullName to
avoid a potential NullReferenceException if a job is ever constructed
without a PayloadType (e.g. from storage deserialization).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Change the implicit string operator to explicit so callers cannot
accidentally trigger InvalidPartitionKeyException through a bare
assignment. Update the corresponding test to use an explicit cast
and rename the test method accordingly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ilities csproj

- Remove duplicate <Nullable>enable</Nullable>
- Add LangVersion 12 and IsTestProject true
- Add xunit.v3.extensibility.core 2.0.1 (library project; not executable)
- Add AwesomeAssertions 9.1.0 and NSubstitute 5.3.0
- Add global Xunit using for [Fact]/[Theory] in abstract base classes
…c and GetDueJobsAsync

- InsertAsync: documents SequenceNumber mutation contract (D-04, D-05, D-06)
  — partitioned jobs get monotonically increasing sequence number scoped to
    (queue, partition key); unpartitioned jobs keep null; idempotency key
    collision assigns existing sequence number
- GetDueJobsAsync: documents three-rule FIFO ordering contract (D-02)
  — at most one job per partition (lowest sequence number)
  — excludes partitions with Processing or retrying Pending jobs
  — unpartitioned jobs unaffected
- Seven [Fact] methods covering FIFO-07 (2), FIFO-08 (2), FIFO-09 (3)
- IAsyncLifetime with ValueTask lifecycle (xUnit v3)
- Protected abstract CreateStorage() factory for backend subclasses
- No concrete subclass — Phases 8 and 9 provide those (D-11)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ness and coverage

- CR-01: thread IAtomizerClock into CreateStorage(clock) so storage uses
  the test-controlled clock instead of its own internal instance
- CR-02: add pre-condition <para> to class XML summary documenting that
  CreateStorage must return a FIFO-compliant implementation
- WR-01: move _now initialisation from field-initializer into
  InitializeAsync so each test gets a fresh, clock-stub-aligned timestamp
- WR-03: add cross-queue partition isolation test
- WR-04: add ReleaseLeasedAsync partition-unblocking test

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- InsertAsync_WhenPartitionedJob_ShouldAssignSequenceNumberStartingAtOne
- InsertAsync_WhenPartitionedJobsInDifferentQueues_ShouldAssignIndependentSequences
- InsertAsync_WhenUnpartitionedJob_ShouldLeaveSequenceNumberNull
- InsertAsync_WhenIdempotencyKeyCollision_ShouldReturnExistingIdAndAssignExistingSequenceNumber
- InsertAsync_WhenIdempotencyKeyCollision_ShouldNotIncreaseJobCount
…ence assignment

- Add _partitionSequences field (ConcurrentDictionary per queue per partition)
- CR-01: idempotency check before sequence assignment - returns existing Id
- FIFO-09: assign monotonically increasing SequenceNumber per (queue, partitionKey)
- Unpartitioned jobs leave SequenceNumber null
- On collision: assigns existing.SequenceNumber to passed-in job, skips insert
- GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnOnlyLowestSequenceNumber
- GetDueJobsAsync_WhenPartitionHeadAndUnpartitionedJobExist_ShouldReturnBoth
- GetDueJobsAsync_WhenPartitionJobIsProcessing_ShouldReturnEmpty
- GetDueJobsAsync_WhenPartitionJobIsPendingWithAttempts_ShouldReturnEmpty
- GetDueJobsAsync_WhenQueueABlockedPartitionSameKeyAsQueueB_ShouldReturnQueueBJobUnaffected
- GetDueJobsAsync_WhenProcessingJobHasExpiredVisibleAt_ShouldReturnIt
…ilter

- Pass 1: collect blocked partition keys scanning full queue snapshot
- Pass 2: filter eligible candidates excluding blocked partitions
- Pass 3: FIFO head-of-partition selection via OrderBy(SequenceNumber).First()
- netstandard2.0-safe: OrderBy().First() instead of MinBy()
- Unpartitioned jobs returned alongside partition heads
…izerJob.Create

- Pass partitionKey: schedule.PartitionKey as final named argument
- Ensures scheduled jobs inherit partition key from the recurring schedule
- Format InMemoryStorage.cs to comply with CSharpier (printWidth: 120)
- FIFO InsertAsync with CR-01 idempotency fix and sequence assignment
- FIFO GetDueJobsAsync three-pass partition blocking filter
- ScheduleProcessor partitionKey forwarding
- 11 new unit tests added (TDD: RED + GREEN gates satisfied)
- 104/104 tests passing
- Implements CreateStorage(IAtomizerClock) returning new InMemoryStorage
- Inherits all 9 FIFO contract tests from AtomizerStorageContractTests
- AmountOfJobsToRetainInMemory=100 ensures terminal jobs retained during tests
- All 9 inherited tests pass (113/113 total suite green on net8.0)
- GetDueJobsAsync_WhenPartitionHeadCompleted_ShouldUnblockNextJob
- GetDueJobsAsync_WhenPartitionHeadFailed_ShouldUnblockNextJob
mnbuhl and others added 22 commits May 4, 2026 18:04
… in all three dialects

- Replace flat WHERE in GetDueJobs with blocked_partitions + partition_heads CTE for FIFO-aware acquisition
- PostgreSQL: FOR NO KEY UPDATE SKIP LOCKED on outer SELECT, derived-table subquery in InsertJobWithSequence
- SQL Server: WITH (UPDLOCK, READPAST, ROWLOCK) on outer FROM only, TOP(batchSize) via string interpolation
- MySQL: LEFT JOIN anti-join in partition_heads CTE, derived-table form for COALESCE(MAX) to avoid same-table restriction
- All three providers: atomic COALESCE(MAX(SequenceNumber), 0) + 1 sequence assignment per (queue, partition_key)
…-01 idempotency

- Add partitioned insert branch: when job.PartitionKey != null and supported provider, delegate to dialect.InsertJobWithSequence and read back assigned SequenceNumber
- CR-01 fix: assign job.SequenceNumber = existing.SequenceNumber on idempotency collision so caller receives the originally assigned sequence
- Unpartitioned path (JobEntities.Add + SaveChangesAsync) unchanged
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ests

- PostgresStorageContractTests: wires to PostgreSqlDatabaseFixture
- SqlServerStorageContractTests: wires to SqlServerDatabaseFixture
- MySqlStorageContractTests: wires to MySqlDatabaseFixture
- Each creates fresh EntityFrameworkCoreStorage<TDbContext> per test
- Each cleans up all 3 entity sets and disposes DbContext in DisposeAsync
…ontext

- Add SqliteStorageContractTests with AllowUnsafeProviderFallback=true
- Fix DisposeAsync in all 4 contract subclasses to use a fresh DbContext
  from the fixture for cleanup, avoiding EF change-tracker conflicts
- Fix EntityFrameworkCoreStorage.UpdateJobsAsync: call ChangeTracker.Clear()
  before UpdateRange to prevent identity-conflict when entities were tracked
  by a prior InsertAsync on the same DbContext instance
- Enhance LINQ fallback GetDueJobsAsync with full FIFO partition-head and
  blocking logic so all 11 contract tests pass for SQLite
- Enhance LINQ fallback InsertAsync to assign SequenceNumber atomically
  via MAX(SequenceNumber)+1 for partitioned jobs on unsupported providers
The partition_heads CTE was computing MIN(SequenceNumber) across all
jobs in a partition (including Completed and Failed), causing terminal
jobs to remain the "head" and block the next job indefinitely.

Fix: add eligibility filter (Pending due OR Processing-expired) to the
partition_heads CTE in all three SQL dialects so only actionable jobs
are considered when selecting the head-of-partition. This makes
FIFO-13 terminal-state unblocking work end-to-end.

Exposed by the new provider contract tests (GetDueJobsAsync_WhenPartition
HeadCompleted/Failed_ShouldUnblockNextJob).
Add .Include(j => j.Errors) to both the FromSqlInterpolated supported-provider
path and the LINQ fallback path so AtomizerJobEntity.Errors is populated when
returned from GetDueJobsAsync instead of always being an empty list.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace bare column references (colStatus, colVisibleAt, colScheduledAt)
with t.-prefixed aliases in the outer WHERE clause of GetDueJobs across
all three SQL dialects (PostgreSQL, MySQL, SQL Server). This prevents
ambiguous-column errors when partition_heads CTE exposes columns of the
same name, making the SQL correct by design rather than by coincidence.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Mirror the same guard already present in UpdateJobsAsync to prevent
InvalidOperationException when an AtomizerScheduleEntity is already
tracked from a prior UpsertScheduleAsync call on the same DbContext.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
RetryStrategy.None serializes to [0ms] (length 1), so the length == 0
branch is not the normal None round-trip path — it is a defensive
fallback for corrupt rows with an empty RetryIntervals column. Without
the guard, RetryStrategy.Intervals([]) would throw. Add a comment
explaining the semantics to prevent future "simplification" that removes
the guard.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add a filtered unique index on AtomizerJobEntity.IdempotencyKey (NULL rows
excluded) to enforce the idempotency constraint at the database level rather
than relying on a TOCTOU read-then-insert check. Handle the resulting
DbUpdateException in InsertAsync by re-querying for the winning concurrent
insert and returning its ID, matching the semantics of the pre-check path.
Remove the @todo comment now that the constraint is implemented.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Change QueueKey HasMaxLength from 512 to 100 (domain max) and
ScheduleJobKey from 512 to 255 (JobKey domain max). LeaseToken and
IdempotencyKey remain at 512 since they have no strict domain cap.
Inline comments explain each size choice.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ontract

When the lease is not acquired the generic overload returns default(TResult),
which is indistinguishable from a callback that returned null. Add a <remarks>
block documenting this behavior and directing callers that do not need a return
value to use the non-generic overload instead. A breaking signature change to
return a discriminated result type is deferred to the next major version.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…act tests

In all four provider contract test classes: delete AtomizerJobErrors before
AtomizerJobs to respect the FK constraint (previously reversed), and pass a
30-second CancellationTokenSource token to SaveChangesAsync so teardown
does not hang indefinitely if the container is slow or the connection drops.
ExecuteDeleteAsync is not used as the test project targets EF Core 6.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…reated) and fix CR-01 eager-load via two-step query

HasFilter with nameof() produces unquoted column references that PostgreSQL
rejects during EnsureCreatedAsync schema creation. Reverted the partial unique
index and its DbUpdateException catch block entirely — the TOCTOU race remains
a known limitation (tracked as deferred tech debt).

CR-01 fix corrected: FromSqlInterpolated with CTEs is non-composable, so
Include() is forbidden by EF Core. Replaced with a two-step load: raw SQL
for the job rows, then a second query for their errors grouped by job ID.

All 44 StorageContractTests pass on net8.0 and net10.0.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
These files should never have been committed — .planning/ is already
in .gitignore. Untracking all four summary files that slipped through.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@mnbuhl mnbuhl marked this pull request as ready for review May 4, 2026 19:05
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c68a996439

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/Atomizer/Processing/JobProcessor.cs Outdated
Comment on lines +67 to +70
// Release the job so its partition (if any) is not blocked for the full visibility timeout.
job.Release(_clock.UtcNow);
using var scope = _serviceScopeFactory.CreateScope();
await scope.Storage.UpdateJobsAsync(new[] { job }, CancellationToken.None);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not release canceled jobs as plain pending retries

ProcessAsync increments attempts before dispatch, then the cancellation path calls job.Release(...); that leaves partitioned jobs in Pending with Attempts > 0, which IsPartitionBlocked treats as a blocked partition. In this state there is no lease token or retry schedule to transition the job again, so the canceled head job (and its partition) can remain permanently ineligible instead of being recoverable after cancellation.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 30fc0be. Before calling Release(), we now undo the Attempt() increment (job.Attempts -= 1) so the job returns to Pending + Attempts = 0. This means IsPartitionBlocked correctly treats it as unblocked and the partition stays eligible for the next poll cycle.

{8}, {9}, {10},
{11}, {12}, {13},
{14},
COALESCE((SELECT MAX({{_jSequenceNumber}}) FROM {{_jTable}} WHERE {{_jQueueKey}} = {15} AND {{_jPartitionKey}} = {16}), 0) + 1;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize sequence assignment for partitioned EF inserts

This insert assigns sequence numbers via MAX(sequence)+1 without any per-partition lock or uniqueness guarantee, so concurrent enqueues in the same (queue, partition) can receive the same SequenceNumber. Once duplicates exist, the partition-head query (MIN(sequence) join) can return multiple jobs from one partition in the same batch, violating the FIFO/one-in-flight contract; the same pattern is present in the PostgreSQL and MySQL dialect implementations in this commit.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 30fc0be for all three dialects. The MAX(sequence)+1 subquery now acquires a row lock before reading, serializing concurrent inserts into the same (queue, partition):

  • SQL Server: WITH (UPDLOCK, HOLDLOCK) — locks existing rows and the gap, preventing phantom reads when no rows exist yet
  • PostgreSQL: FOR NO KEY UPDATE inside the inner subquery — locks matching rows before the aggregate
  • MySQL: FOR UPDATE inside the inner subquery — same approach

This ensures two concurrent InsertAsync calls on the same partition cannot both read the same MAX and assign duplicate sequence numbers.

mnbuhl and others added 3 commits May 4, 2026 21:25
CR-01: Undo Attempt() increment before Release() on cancellation — a
cancelled job is not a failed attempt, and Pending+Attempts>0 was
causing the partition to be treated as blocked indefinitely.

CR-02: Add row-locking to the MAX(sequence)+1 subquery in all three
SQL dialects to prevent duplicate sequence numbers under concurrent
partitioned inserts. Uses WITH (UPDLOCK, HOLDLOCK) for SQL Server,
FOR NO KEY UPDATE for PostgreSQL, and FOR UPDATE for MySQL.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Deleted all existing migrations and regenerated from scratch to reflect
the current entity model (FIFO PartitionKey + SequenceNumber columns).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add ProcessStockEventJob handler demonstrating PartitionKey usage:
  stock events for the same product are processed in strict FIFO order
- Add POST /stock-events endpoint that enqueues with PartitionKey set
  to the ProductId, serializing per-product processing
- README: move FIFO from Planned Features to Features, add Quick Start
  section 5 explaining partition key usage with a code example

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@mnbuhl mnbuhl merged commit b166f31 into main May 4, 2026
1 check passed
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 4, 2026

Code Coverage

Package Line Rate Health
Atomizer 71%
Atomizer 13%
Atomizer.EntityFrameworkCore 90%
Atomizer 13%
Atomizer.EntityFrameworkCore 90%
Atomizer 13%
Atomizer.EntityFrameworkCore 90%
Atomizer 71%
Atomizer 71%
Summary 60% (6707 / 11892)

@mnbuhl mnbuhl deleted the feature/fifo branch May 5, 2026 06:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant