Conversation
…eyException - InvalidPartitionKeyExceptionTests: 5 cases covering all 4 constructors and ArgumentException inheritance - PartitionKeyTests: 11 cases covering valid/invalid inputs, implicit conversions, equality
…eption - InvalidPartitionKeyException derives from ArgumentException with all 4 constructors and XML docs - PartitionKey sealed value object: 255-char cap, IsNullOrWhiteSpace guard, implicit string conversions, ValueObject equality - Both files follow exact structural analog of JobKey / InvalidJobKeyException
- Add PartitionKey? PartitionKey to EnqueueOptions with XML docs - Add PartitionKey? PartitionKey to RecurringOptions with XML docs
…AtomizerJob - Add PartitionKey? PartitionKey property with XML docs - Add long? SequenceNumber property with XML docs - Add IsPartitionBlocked computed property (blocks when Processing or Pending+retrying) - Extend Create() with PartitionKey? partitionKey = null parameter - Initialize PartitionKey and SequenceNumber = null in Create() object initializer
- Add PartitionKey? PartitionKey property with XML docs - Extend Create() with PartitionKey? partitionKey = null parameter - Initialize PartitionKey = partitionKey in Create() object initializer
- Pass partitionKey: options.PartitionKey (named arg) to AtomizerJob.Create() in EnqueueInternalAsync - Pass options.PartitionKey as last positional arg to AtomizerSchedule.Create() in ScheduleRecurringAsync
- 11 PartitionKeyTests already provided by Plan 01 (PartitionKeyTests.cs) - 4 AtomizerJobPartitionTests: null partition → false, Pending+0 → false, Processing → true, Pending+Attempts>0 → true - All 15 targeted tests pass; full suite 93 passed on net8.0 and net10.0
Add job.Release and UpdateJobsAsync in the OperationCanceledException catch branch so partitioned jobs unblock their partition immediately instead of waiting for the full visibility timeout to expire. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Apply the mandatory project convention that all public implementation classes are sealed. Both types were introduced in this phase without the sealed modifier. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add Processing status pre-condition checks matching the guards already present on Lease, Release, and Attempt, preventing double-complete, double-fail, and completing/failing a job that was never leased. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace job.PayloadType!.FullName with job.PayloadType?.FullName to avoid a potential NullReferenceException if a job is ever constructed without a PayloadType (e.g. from storage deserialization). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Change the implicit string operator to explicit so callers cannot accidentally trigger InvalidPartitionKeyException through a bare assignment. Update the corresponding test to use an explicit cast and rename the test method accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ilities csproj - Remove duplicate <Nullable>enable</Nullable> - Add LangVersion 12 and IsTestProject true - Add xunit.v3.extensibility.core 2.0.1 (library project; not executable) - Add AwesomeAssertions 9.1.0 and NSubstitute 5.3.0 - Add global Xunit using for [Fact]/[Theory] in abstract base classes
…c and GetDueJobsAsync
- InsertAsync: documents SequenceNumber mutation contract (D-04, D-05, D-06)
— partitioned jobs get monotonically increasing sequence number scoped to
(queue, partition key); unpartitioned jobs keep null; idempotency key
collision assigns existing sequence number
- GetDueJobsAsync: documents three-rule FIFO ordering contract (D-02)
— at most one job per partition (lowest sequence number)
— excludes partitions with Processing or retrying Pending jobs
— unpartitioned jobs unaffected
- Seven [Fact] methods covering FIFO-07 (2), FIFO-08 (2), FIFO-09 (3) - IAsyncLifetime with ValueTask lifecycle (xUnit v3) - Protected abstract CreateStorage() factory for backend subclasses - No concrete subclass — Phases 8 and 9 provide those (D-11)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ness and coverage - CR-01: thread IAtomizerClock into CreateStorage(clock) so storage uses the test-controlled clock instead of its own internal instance - CR-02: add pre-condition <para> to class XML summary documenting that CreateStorage must return a FIFO-compliant implementation - WR-01: move _now initialisation from field-initializer into InitializeAsync so each test gets a fresh, clock-stub-aligned timestamp - WR-03: add cross-queue partition isolation test - WR-04: add ReleaseLeasedAsync partition-unblocking test Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- InsertAsync_WhenPartitionedJob_ShouldAssignSequenceNumberStartingAtOne - InsertAsync_WhenPartitionedJobsInDifferentQueues_ShouldAssignIndependentSequences - InsertAsync_WhenUnpartitionedJob_ShouldLeaveSequenceNumberNull - InsertAsync_WhenIdempotencyKeyCollision_ShouldReturnExistingIdAndAssignExistingSequenceNumber - InsertAsync_WhenIdempotencyKeyCollision_ShouldNotIncreaseJobCount
…ence assignment - Add _partitionSequences field (ConcurrentDictionary per queue per partition) - CR-01: idempotency check before sequence assignment - returns existing Id - FIFO-09: assign monotonically increasing SequenceNumber per (queue, partitionKey) - Unpartitioned jobs leave SequenceNumber null - On collision: assigns existing.SequenceNumber to passed-in job, skips insert
- GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnOnlyLowestSequenceNumber - GetDueJobsAsync_WhenPartitionHeadAndUnpartitionedJobExist_ShouldReturnBoth - GetDueJobsAsync_WhenPartitionJobIsProcessing_ShouldReturnEmpty - GetDueJobsAsync_WhenPartitionJobIsPendingWithAttempts_ShouldReturnEmpty - GetDueJobsAsync_WhenQueueABlockedPartitionSameKeyAsQueueB_ShouldReturnQueueBJobUnaffected - GetDueJobsAsync_WhenProcessingJobHasExpiredVisibleAt_ShouldReturnIt
…ilter - Pass 1: collect blocked partition keys scanning full queue snapshot - Pass 2: filter eligible candidates excluding blocked partitions - Pass 3: FIFO head-of-partition selection via OrderBy(SequenceNumber).First() - netstandard2.0-safe: OrderBy().First() instead of MinBy() - Unpartitioned jobs returned alongside partition heads
…izerJob.Create - Pass partitionKey: schedule.PartitionKey as final named argument - Ensures scheduled jobs inherit partition key from the recurring schedule - Format InMemoryStorage.cs to comply with CSharpier (printWidth: 120)
- FIFO InsertAsync with CR-01 idempotency fix and sequence assignment - FIFO GetDueJobsAsync three-pass partition blocking filter - ScheduleProcessor partitionKey forwarding - 11 new unit tests added (TDD: RED + GREEN gates satisfied) - 104/104 tests passing
- Implements CreateStorage(IAtomizerClock) returning new InMemoryStorage - Inherits all 9 FIFO contract tests from AtomizerStorageContractTests - AmountOfJobsToRetainInMemory=100 ensures terminal jobs retained during tests - All 9 inherited tests pass (113/113 total suite green on net8.0)
- GetDueJobsAsync_WhenPartitionHeadCompleted_ShouldUnblockNextJob - GetDueJobsAsync_WhenPartitionHeadFailed_ShouldUnblockNextJob
… in all three dialects - Replace flat WHERE in GetDueJobs with blocked_partitions + partition_heads CTE for FIFO-aware acquisition - PostgreSQL: FOR NO KEY UPDATE SKIP LOCKED on outer SELECT, derived-table subquery in InsertJobWithSequence - SQL Server: WITH (UPDLOCK, READPAST, ROWLOCK) on outer FROM only, TOP(batchSize) via string interpolation - MySQL: LEFT JOIN anti-join in partition_heads CTE, derived-table form for COALESCE(MAX) to avoid same-table restriction - All three providers: atomic COALESCE(MAX(SequenceNumber), 0) + 1 sequence assignment per (queue, partition_key)
…-01 idempotency - Add partitioned insert branch: when job.PartitionKey != null and supported provider, delegate to dialect.InsertJobWithSequence and read back assigned SequenceNumber - CR-01 fix: assign job.SequenceNumber = existing.SequenceNumber on idempotency collision so caller receives the originally assigned sequence - Unpartitioned path (JobEntities.Add + SaveChangesAsync) unchanged
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ests - PostgresStorageContractTests: wires to PostgreSqlDatabaseFixture - SqlServerStorageContractTests: wires to SqlServerDatabaseFixture - MySqlStorageContractTests: wires to MySqlDatabaseFixture - Each creates fresh EntityFrameworkCoreStorage<TDbContext> per test - Each cleans up all 3 entity sets and disposes DbContext in DisposeAsync
…ontext - Add SqliteStorageContractTests with AllowUnsafeProviderFallback=true - Fix DisposeAsync in all 4 contract subclasses to use a fresh DbContext from the fixture for cleanup, avoiding EF change-tracker conflicts - Fix EntityFrameworkCoreStorage.UpdateJobsAsync: call ChangeTracker.Clear() before UpdateRange to prevent identity-conflict when entities were tracked by a prior InsertAsync on the same DbContext instance - Enhance LINQ fallback GetDueJobsAsync with full FIFO partition-head and blocking logic so all 11 contract tests pass for SQLite - Enhance LINQ fallback InsertAsync to assign SequenceNumber atomically via MAX(SequenceNumber)+1 for partitioned jobs on unsupported providers
The partition_heads CTE was computing MIN(SequenceNumber) across all jobs in a partition (including Completed and Failed), causing terminal jobs to remain the "head" and block the next job indefinitely. Fix: add eligibility filter (Pending due OR Processing-expired) to the partition_heads CTE in all three SQL dialects so only actionable jobs are considered when selecting the head-of-partition. This makes FIFO-13 terminal-state unblocking work end-to-end. Exposed by the new provider contract tests (GetDueJobsAsync_WhenPartition HeadCompleted/Failed_ShouldUnblockNextJob).
Add .Include(j => j.Errors) to both the FromSqlInterpolated supported-provider path and the LINQ fallback path so AtomizerJobEntity.Errors is populated when returned from GetDueJobsAsync instead of always being an empty list. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace bare column references (colStatus, colVisibleAt, colScheduledAt) with t.-prefixed aliases in the outer WHERE clause of GetDueJobs across all three SQL dialects (PostgreSQL, MySQL, SQL Server). This prevents ambiguous-column errors when partition_heads CTE exposes columns of the same name, making the SQL correct by design rather than by coincidence. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Mirror the same guard already present in UpdateJobsAsync to prevent InvalidOperationException when an AtomizerScheduleEntity is already tracked from a prior UpsertScheduleAsync call on the same DbContext. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
RetryStrategy.None serializes to [0ms] (length 1), so the length == 0 branch is not the normal None round-trip path — it is a defensive fallback for corrupt rows with an empty RetryIntervals column. Without the guard, RetryStrategy.Intervals([]) would throw. Add a comment explaining the semantics to prevent future "simplification" that removes the guard. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add a filtered unique index on AtomizerJobEntity.IdempotencyKey (NULL rows excluded) to enforce the idempotency constraint at the database level rather than relying on a TOCTOU read-then-insert check. Handle the resulting DbUpdateException in InsertAsync by re-querying for the winning concurrent insert and returning its ID, matching the semantics of the pre-check path. Remove the @todo comment now that the constraint is implemented. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Change QueueKey HasMaxLength from 512 to 100 (domain max) and ScheduleJobKey from 512 to 255 (JobKey domain max). LeaseToken and IdempotencyKey remain at 512 since they have no strict domain cap. Inline comments explain each size choice. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ontract When the lease is not acquired the generic overload returns default(TResult), which is indistinguishable from a callback that returned null. Add a <remarks> block documenting this behavior and directing callers that do not need a return value to use the non-generic overload instead. A breaking signature change to return a discriminated result type is deferred to the next major version. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…act tests In all four provider contract test classes: delete AtomizerJobErrors before AtomizerJobs to respect the FK constraint (previously reversed), and pass a 30-second CancellationTokenSource token to SaveChangesAsync so teardown does not hang indefinitely if the container is slow or the connection drops. ExecuteDeleteAsync is not used as the test project targets EF Core 6.0. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…reated) and fix CR-01 eager-load via two-step query HasFilter with nameof() produces unquoted column references that PostgreSQL rejects during EnsureCreatedAsync schema creation. Reverted the partial unique index and its DbUpdateException catch block entirely — the TOCTOU race remains a known limitation (tracked as deferred tech debt). CR-01 fix corrected: FromSqlInterpolated with CTEs is non-composable, so Include() is forbidden by EF Core. Replaced with a two-step load: raw SQL for the job rows, then a second query for their errors grouped by job ID. All 44 StorageContractTests pass on net8.0 and net10.0. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
These files should never have been committed — .planning/ is already in .gitignore. Untracking all four summary files that slipped through. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c68a996439
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // Release the job so its partition (if any) is not blocked for the full visibility timeout. | ||
| job.Release(_clock.UtcNow); | ||
| using var scope = _serviceScopeFactory.CreateScope(); | ||
| await scope.Storage.UpdateJobsAsync(new[] { job }, CancellationToken.None); |
There was a problem hiding this comment.
Do not release canceled jobs as plain pending retries
ProcessAsync increments attempts before dispatch, then the cancellation path calls job.Release(...); that leaves partitioned jobs in Pending with Attempts > 0, which IsPartitionBlocked treats as a blocked partition. In this state there is no lease token or retry schedule to transition the job again, so the canceled head job (and its partition) can remain permanently ineligible instead of being recoverable after cancellation.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 30fc0be. Before calling Release(), we now undo the Attempt() increment (job.Attempts -= 1) so the job returns to Pending + Attempts = 0. This means IsPartitionBlocked correctly treats it as unblocked and the partition stays eligible for the next poll cycle.
| {8}, {9}, {10}, | ||
| {11}, {12}, {13}, | ||
| {14}, | ||
| COALESCE((SELECT MAX({{_jSequenceNumber}}) FROM {{_jTable}} WHERE {{_jQueueKey}} = {15} AND {{_jPartitionKey}} = {16}), 0) + 1; |
There was a problem hiding this comment.
Serialize sequence assignment for partitioned EF inserts
This insert assigns sequence numbers via MAX(sequence)+1 without any per-partition lock or uniqueness guarantee, so concurrent enqueues in the same (queue, partition) can receive the same SequenceNumber. Once duplicates exist, the partition-head query (MIN(sequence) join) can return multiple jobs from one partition in the same batch, violating the FIFO/one-in-flight contract; the same pattern is present in the PostgreSQL and MySQL dialect implementations in this commit.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in 30fc0be for all three dialects. The MAX(sequence)+1 subquery now acquires a row lock before reading, serializing concurrent inserts into the same (queue, partition):
- SQL Server:
WITH (UPDLOCK, HOLDLOCK)— locks existing rows and the gap, preventing phantom reads when no rows exist yet - PostgreSQL:
FOR NO KEY UPDATEinside the inner subquery — locks matching rows before the aggregate - MySQL:
FOR UPDATEinside the inner subquery — same approach
This ensures two concurrent InsertAsync calls on the same partition cannot both read the same MAX and assign duplicate sequence numbers.
CR-01: Undo Attempt() increment before Release() on cancellation — a cancelled job is not a failed attempt, and Pending+Attempts>0 was causing the partition to be treated as blocked indefinitely. CR-02: Add row-locking to the MAX(sequence)+1 subquery in all three SQL dialects to prevent duplicate sequence numbers under concurrent partitioned inserts. Uses WITH (UPDLOCK, HOLDLOCK) for SQL Server, FOR NO KEY UPDATE for PostgreSQL, and FOR UPDATE for MySQL. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Deleted all existing migrations and regenerated from scratch to reflect the current entity model (FIFO PartitionKey + SequenceNumber columns). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add ProcessStockEventJob handler demonstrating PartitionKey usage: stock events for the same product are processed in strict FIFO order - Add POST /stock-events endpoint that enqueues with PartitionKey set to the ProductId, serializing per-product processing - README: move FIFO from Planned Features to Features, add Quick Start section 5 explaining partition key usage with a code example Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
Adds first-class FIFO partition support to Atomizer's job model, public API, and storage contract. Jobs can now be assigned a
PartitionKeyso all jobs sharing the same key within a queue are processed strictly in order, with at most one job per partition in flight at any time.PartitionKeyvalue object — validated, immutable key scoped to (queue, partition); throwsInvalidPartitionKeyExceptionon invalid inputAtomizerJobandAtomizerSchedulecarryPartitionKey,SequenceNumber, andIsPartitionBlocked;EnqueueOptions/RecurringOptionsexpose the new field to callersIAtomizerClient—EnqueueAsyncandScheduleRecurringAsyncaccept an optionalpartitionKeyargument; no breaking change for existing callersIAtomizerStorageFIFO contract — formal<remarks>onInsertAsync(monotonicSequenceNumberassignment) andGetDueJobsAsync(three-rule partition-blocking filter)AtomizerStorageContractTests— abstract xUnit v3 test base (9[Fact]methods) covering FIFO-07/08/09; Phase 8 (InMemory) and Phase 9 (EF Core) each add one concrete subclass to prove complianceWhat's not in this PR
Concrete storage implementations (InMemory FIFO filter, EF Core FIFO SQL) are in the next two phases. The contract test base will have failing tests until those implementations land.
Test plan
dotnet build— 0 errors across all target frameworksdotnet test tests/Atomizer.Tests/— 93 tests passing (net8.0 + net10.0)PartitionKeyunit tests: valid/invalid construction, equality, length boundaryAtomizerJobpartition tests:PartitionKey,SequenceNumber,IsPartitionBlockedproperty behaviourAtomizerStorageContractTestscompiles as abstract base without a concrete subclass🤖 Generated with Claude Code