From c6e49d2c367dd4cd5ed27713b5343e0999d547ce Mon Sep 17 00:00:00 2001 From: mnbuhl Date: Mon, 4 May 2026 23:13:17 +0200 Subject: [PATCH 1/4] fix: return all eligible FIFO partition jobs per sweep instead of one Previously GetDueJobsAsync returned at most one job per partition per sweep via a partition_heads CTE that joined on MIN(sequence_number). This capped throughput to one job per partition per poll cycle. The fix removes the partition_heads CTE and replaces it with a simple blocked_partitions filter: any partition with a Processing job or a Pending job with prior attempts is excluded entirely; all other partitions contribute all their eligible jobs up to the batch size cap. Ordering by (scheduled_at, sequence_number) preserves FIFO delivery. Co-Authored-By: Claude Sonnet 4.6 --- .../Providers/Sql/MySqlDialect.cs | 59 ++++++------------- .../Providers/Sql/PostgreSqlDialect.cs | 58 ++++++------------ .../Providers/Sql/SqlServerDialect.cs | 56 ++++++------------ .../Storage/EntityFrameworkCoreStorage.cs | 27 +++------ src/Atomizer/Abstractions/IAtomizerStorage.cs | 3 +- src/Atomizer/Storage/InMemoryStorage.cs | 10 +--- .../AtomizerStorageContractTests.cs | 42 ++++++++++--- .../Storage/InMemoryStorageTests.cs | 7 ++- 8 files changed, 105 insertions(+), 157 deletions(-) diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs index 0db2aca..f4a6df3 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs @@ -17,66 +17,45 @@ SELECT DISTINCT {{_jPartitionKey}} {{_jStatus}} = {{_statusProcessing}} OR ({{_jStatus}} = {{_statusPending}} AND {{_jAttempts}} > 0) ) - ), - partition_heads AS ( - SELECT j.{{_jPartitionKey}}, MIN(j.{{_jSequenceNumber}}) AS min_seq - FROM {{_jTable}} AS j - LEFT JOIN blocked_partitions bp ON j.{{_jPartitionKey}} = bp.{{_jPartitionKey}} - WHERE j.{{_jQueueKey}} = {1} - AND j.{{_jPartitionKey}} IS NOT NULL - AND bp.{{_jPartitionKey}} IS NULL - AND ( - (j.{{_jStatus}} = {{_statusPending}} - AND (j.{{_jVisibleAt}} IS NULL OR j.{{_jVisibleAt}} <= {10}) - AND j.{{_jScheduledAt}} <= {11}) - OR (j.{{_jStatus}} = {{_statusProcessing}} AND j.{{_jVisibleAt}} <= {12}) - ) - GROUP BY j.{{_jPartitionKey}} ) SELECT t.* FROM {{_jTable}} AS t - LEFT JOIN partition_heads ph - ON t.{{_jPartitionKey}} = ph.{{_jPartitionKey}} - AND t.{{_jSequenceNumber}} = ph.min_seq - WHERE t.{{_jQueueKey}} = {2} + WHERE t.{{_jQueueKey}} = {1} AND ( (t.{{_jPartitionKey}} IS NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {3}) - AND t.{{_jScheduledAt}} <= {4}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {5}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {2}) + AND t.{{_jScheduledAt}} <= {3}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {4}) ) ) OR - (t.{{_jPartitionKey}} IS NOT NULL AND ph.min_seq IS NOT NULL + (t.{{_jPartitionKey}} IS NOT NULL + AND t.{{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {6}) - AND t.{{_jScheduledAt}} <= {7}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {8}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {5}) + AND t.{{_jScheduledAt}} <= {6}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {7}) ) ) ) - ORDER BY t.{{_jScheduledAt}}, t.{{_jId}} - LIMIT {9} + ORDER BY t.{{_jScheduledAt}}, t.{{_jSequenceNumber}}, t.{{_jId}} + LIMIT {8} FOR UPDATE SKIP LOCKED; """; return FormattableStringFactory.Create( format, queueKey.Key, // {0} blocked_partitions queue filter - queueKey.Key, // {1} partition_heads queue filter - queueKey.Key, // {2} outer SELECT queue filter - now, // {3} unpartitioned VisibleAt - now, // {4} unpartitioned ScheduledAt - now, // {5} unpartitioned Processing VisibleAt - now, // {6} partitioned VisibleAt - now, // {7} partitioned ScheduledAt - now, // {8} partitioned Processing VisibleAt - batchSize, // {9} LIMIT - now, // {10} partition_heads VisibleAt - now, // {11} partition_heads ScheduledAt - now // {12} partition_heads Processing VisibleAt + queueKey.Key, // {1} outer SELECT queue filter + now, // {2} unpartitioned VisibleAt + now, // {3} unpartitioned ScheduledAt + now, // {4} unpartitioned Processing VisibleAt + now, // {5} partitioned VisibleAt + now, // {6} partitioned ScheduledAt + now, // {7} partitioned Processing VisibleAt + batchSize // {8} LIMIT ); } diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs index 912480e..22cd2e2 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs @@ -17,65 +17,45 @@ SELECT DISTINCT {{_jPartitionKey}} {{_jStatus}} = {{_statusProcessing}} OR ({{_jStatus}} = {{_statusPending}} AND {{_jAttempts}} > 0) ) - ), - partition_heads AS ( - SELECT {{_jPartitionKey}}, MIN({{_jSequenceNumber}}) AS min_seq - FROM {{_jTable}} - WHERE {{_jQueueKey}} = {1} - AND {{_jPartitionKey}} IS NOT NULL - AND {{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) - AND ( - ({{_jStatus}} = {{_statusPending}} - AND ({{_jVisibleAt}} IS NULL OR {{_jVisibleAt}} <= {10}) - AND {{_jScheduledAt}} <= {11}) - OR ({{_jStatus}} = {{_statusProcessing}} AND {{_jVisibleAt}} <= {12}) - ) - GROUP BY {{_jPartitionKey}} ) SELECT t.* FROM {{_jTable}} AS t - LEFT JOIN partition_heads ph - ON t.{{_jPartitionKey}} = ph.{{_jPartitionKey}} - AND t.{{_jSequenceNumber}} = ph.min_seq - WHERE t.{{_jQueueKey}} = {2} + WHERE t.{{_jQueueKey}} = {1} AND ( (t.{{_jPartitionKey}} IS NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {3}) - AND t.{{_jScheduledAt}} <= {4}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {5}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {2}) + AND t.{{_jScheduledAt}} <= {3}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {4}) ) ) OR - (t.{{_jPartitionKey}} IS NOT NULL AND ph.min_seq IS NOT NULL + (t.{{_jPartitionKey}} IS NOT NULL + AND t.{{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {6}) - AND t.{{_jScheduledAt}} <= {7}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {8}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {5}) + AND t.{{_jScheduledAt}} <= {6}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {7}) ) ) ) - ORDER BY t.{{_jScheduledAt}}, t.{{_jId}} - LIMIT {9} + ORDER BY t.{{_jScheduledAt}}, t.{{_jSequenceNumber}}, t.{{_jId}} + LIMIT {8} FOR NO KEY UPDATE SKIP LOCKED; """; return FormattableStringFactory.Create( format, queueKey.Key, // {0} blocked_partitions queue filter - queueKey.Key, // {1} partition_heads queue filter - queueKey.Key, // {2} outer SELECT queue filter - now, // {3} unpartitioned VisibleAt - now, // {4} unpartitioned ScheduledAt - now, // {5} unpartitioned Processing VisibleAt - now, // {6} partitioned VisibleAt - now, // {7} partitioned ScheduledAt - now, // {8} partitioned Processing VisibleAt - batchSize, // {9} LIMIT - now, // {10} partition_heads VisibleAt - now, // {11} partition_heads ScheduledAt - now // {12} partition_heads Processing VisibleAt + queueKey.Key, // {1} outer SELECT queue filter + now, // {2} unpartitioned VisibleAt + now, // {3} unpartitioned ScheduledAt + now, // {4} unpartitioned Processing VisibleAt + now, // {5} partitioned VisibleAt + now, // {6} partitioned ScheduledAt + now, // {7} partitioned Processing VisibleAt + batchSize // {8} LIMIT ); } diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs index f0092d0..f09542a 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs @@ -17,62 +17,42 @@ SELECT DISTINCT {{_jPartitionKey}} {{_jStatus}} = {{_statusProcessing}} OR ({{_jStatus}} = {{_statusPending}} AND {{_jAttempts}} > 0) ) - ), - partition_heads AS ( - SELECT {{_jPartitionKey}}, MIN({{_jSequenceNumber}}) AS min_seq - FROM {{_jTable}} - WHERE {{_jQueueKey}} = {1} - AND {{_jPartitionKey}} IS NOT NULL - AND {{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) - AND ( - ({{_jStatus}} = {{_statusPending}} - AND ({{_jVisibleAt}} IS NULL OR {{_jVisibleAt}} <= {9}) - AND {{_jScheduledAt}} <= {10}) - OR ({{_jStatus}} = {{_statusProcessing}} AND {{_jVisibleAt}} <= {11}) - ) - GROUP BY {{_jPartitionKey}} ) SELECT TOP({{batchSize}}) t.* FROM {{_jTable}} AS t WITH (UPDLOCK, READPAST, ROWLOCK) - LEFT JOIN partition_heads ph - ON t.{{_jPartitionKey}} = ph.{{_jPartitionKey}} - AND t.{{_jSequenceNumber}} = ph.min_seq - WHERE t.{{_jQueueKey}} = {2} + WHERE t.{{_jQueueKey}} = {1} AND ( (t.{{_jPartitionKey}} IS NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {3}) - AND t.{{_jScheduledAt}} <= {4}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {5}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {2}) + AND t.{{_jScheduledAt}} <= {3}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {4}) ) ) OR - (t.{{_jPartitionKey}} IS NOT NULL AND ph.min_seq IS NOT NULL + (t.{{_jPartitionKey}} IS NOT NULL + AND t.{{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {6}) - AND t.{{_jScheduledAt}} <= {7}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {8}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {5}) + AND t.{{_jScheduledAt}} <= {6}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {7}) ) ) ) - ORDER BY t.{{_jScheduledAt}}, t.{{_jId}}; + ORDER BY t.{{_jScheduledAt}}, t.{{_jSequenceNumber}}, t.{{_jId}}; """; return FormattableStringFactory.Create( format, - queueKey.Key, // {0} blocked_partitions queue filter - queueKey.Key, // {1} partition_heads queue filter - queueKey.Key, // {2} outer SELECT queue filter - now, // {3} unpartitioned VisibleAt - now, // {4} unpartitioned ScheduledAt - now, // {5} unpartitioned Processing VisibleAt - now, // {6} partitioned VisibleAt - now, // {7} partitioned ScheduledAt - now, // {8} partitioned Processing VisibleAt - now, // {9} partition_heads VisibleAt (batchSize is TOP(batchSize) inlined, not a placeholder) - now, // {10} partition_heads ScheduledAt - now // {11} partition_heads Processing VisibleAt + queueKey.Key, // {0} blocked_partitions queue filter (batchSize is TOP(batchSize) inlined, not a placeholder) + queueKey.Key, // {1} outer SELECT queue filter + now, // {2} unpartitioned VisibleAt + now, // {3} unpartitioned ScheduledAt + now, // {4} unpartitioned Processing VisibleAt + now, // {5} partitioned VisibleAt + now, // {6} partitioned ScheduledAt + now // {7} partitioned Processing VisibleAt ); } diff --git a/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs b/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs index 4423838..8554c59 100644 --- a/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs +++ b/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs @@ -254,32 +254,21 @@ CancellationToken cancellationToken .Select(j => j.PartitionKey!) .ToHashSet(); - // 2) Find the lowest sequence number per unblocked partition (partition heads). - // Only consider Pending jobs that are due — Completed and Failed jobs must not - // block the next job from becoming the partition head. - var partitionHeads = allForQueue - .Where(j => - j.PartitionKey != null - && !blockedPartitions.Contains(j.PartitionKey) - && j.Status == AtomizerEntityJobStatus.Pending - && (j.VisibleAt == null || j.VisibleAt <= now) - && j.ScheduledAt <= now - ) - .GroupBy(j => j.PartitionKey!) - .Select(g => g.OrderBy(j => j.SequenceNumber).First().Id) - .ToHashSet(); - - // 3) Apply eligibility filter, FIFO partition-head filter, and batch size limit. + // 2) Apply eligibility filter, blocking filter, and batch size limit. + // All jobs from unblocked partitions are eligible; sequence ordering + // ensures FIFO delivery within each partition. return allForQueue .Where(j => ( - j.Status == AtomizerEntityJobStatus.Pending + ( + j.Status == AtomizerEntityJobStatus.Pending && (j.VisibleAt == null || j.VisibleAt <= now) && j.ScheduledAt <= now - || (j.Status == AtomizerEntityJobStatus.Processing && j.VisibleAt <= now) // lease expired - ) && (j.PartitionKey == null || partitionHeads.Contains(j.Id)) + ) || (j.Status == AtomizerEntityJobStatus.Processing && j.VisibleAt <= now) // lease expired + ) && (j.PartitionKey == null || !blockedPartitions.Contains(j.PartitionKey)) ) .OrderBy(j => j.ScheduledAt) + .ThenBy(j => j.SequenceNumber) .Take(batchSize) .Select(j => j.ToAtomizerJob()) .ToList(); diff --git a/src/Atomizer/Abstractions/IAtomizerStorage.cs b/src/Atomizer/Abstractions/IAtomizerStorage.cs index 52868c1..610a130 100644 --- a/src/Atomizer/Abstractions/IAtomizerStorage.cs +++ b/src/Atomizer/Abstractions/IAtomizerStorage.cs @@ -39,9 +39,10 @@ public interface IAtomizerStorage /// /// When partition keys are in use, this method enforces FIFO ordering: /// - /// At most one job per (queue, partition key) is returned — the job with the lowest sequence number. /// A partition is excluded entirely if any job within it is /// or with prior attempts (Attempts > 0). + /// All eligible jobs from unblocked partitions are returned (up to ), + /// allowing multiple jobs from the same partition to be leased in a single sweep for higher throughput. /// Jobs without a partition key are unaffected and returned normally alongside partitioned jobs. /// /// diff --git a/src/Atomizer/Storage/InMemoryStorage.cs b/src/Atomizer/Storage/InMemoryStorage.cs index a749bbe..49d8d56 100644 --- a/src/Atomizer/Storage/InMemoryStorage.cs +++ b/src/Atomizer/Storage/InMemoryStorage.cs @@ -145,15 +145,9 @@ CancellationToken cancellationToken ) .Select(j => j!); - var unpartitioned = eligible.Where(j => j.PartitionKey == null); - var partitionHeads = eligible - .Where(j => j.PartitionKey != null) - .GroupBy(j => j.PartitionKey!.Key) - .Select(g => g.OrderBy(j => j.SequenceNumber).First()); - - var candidates = unpartitioned - .Concat(partitionHeads) + var candidates = eligible .OrderBy(j => j.ScheduledAt) + .ThenBy(j => j.SequenceNumber) .ThenBy(j => j.CreatedAt) .Take(Math.Max(0, batchSize)) .ToList(); diff --git a/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs b/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs index d55c2fb..edcee19 100644 --- a/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs +++ b/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs @@ -108,15 +108,15 @@ public async Task InsertAsync_WhenIdempotencyKeyCollision_ShouldAssignExistingSe } // ------------------------------------------------------------------ - // FIFO-07: GetDueJobsAsync returns at most one job per partition + // FIFO-07: GetDueJobsAsync returns all eligible jobs from unblocked partitions // ------------------------------------------------------------------ /// - /// FIFO-07: When multiple jobs share a partition key, only the lowest-sequence-number - /// job is returned by . + /// FIFO-07: When multiple jobs share a partition key, all of them are returned in + /// sequence-number order so a single sweep can process the whole partition. /// [Fact] - public async Task GetDueJobsAsync_WhenMultipleJobsInSamePartition_ShouldReturnOnlyLowestSequenceNumber() + public async Task GetDueJobsAsync_WhenMultipleJobsInSamePartition_ShouldReturnAllInSequenceOrder() { // Arrange var partitionKey = new PartitionKey("batch-key"); @@ -129,9 +129,32 @@ public async Task GetDueJobsAsync_WhenMultipleJobsInSamePartition_ShouldReturnOn // Act var result = await _sut.GetDueJobsAsync(QueueKey.Default, _now, batchSize: 10, CancellationToken.None); - // Assert - result.Should().HaveCount(1); - result.Single().Id.Should().Be(job1.Id); + // Assert — both jobs are returned; job1 (lower sequence) comes first + result.Should().HaveCount(2); + result[0].Id.Should().Be(job1.Id); + result[1].Id.Should().Be(job2.Id); + } + + /// + /// FIFO-07: The batch size cap applies across all eligible jobs regardless of partition. + /// + [Fact] + public async Task GetDueJobsAsync_WhenBatchSizeSmallerThanPartitionJobs_ShouldRespectBatchSize() + { + // Arrange + var partitionKey = new PartitionKey("big-partition"); + var jobs = Enumerable.Range(0, 5).Select(_ => CreateJob(partitionKey: partitionKey)).ToList(); + foreach (var job in jobs) + await _sut.InsertAsync(job, CancellationToken.None); + + // Act + var result = await _sut.GetDueJobsAsync(QueueKey.Default, _now, batchSize: 3, CancellationToken.None); + + // Assert — capped at batchSize, lowest sequence numbers first + result.Should().HaveCount(3); + result[0].Id.Should().Be(jobs[0].Id); + result[1].Id.Should().Be(jobs[1].Id); + result[2].Id.Should().Be(jobs[2].Id); } /// @@ -267,8 +290,9 @@ public async Task ReleaseLeasedAsync_WhenPartitionHeadReleased_ShouldUnblockPart await _sut.ReleaseLeasedAsync(leaseToken, _now, CancellationToken.None); var unblocked = await _sut.GetDueJobsAsync(QueueKey.Default, _now, batchSize: 10, CancellationToken.None); - unblocked.Should().HaveCount(1); - unblocked.Single().Id.Should().Be(job1.Id); + unblocked.Should().HaveCount(2); + unblocked[0].Id.Should().Be(job1.Id); + unblocked[1].Id.Should().Be(job2.Id); } // ------------------------------------------------------------------ diff --git a/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs b/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs index 81cbf39..a6e9af6 100644 --- a/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs +++ b/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs @@ -381,7 +381,7 @@ public async Task InsertAsync_WhenIdempotencyKeyCollision_ShouldNotIncreaseJobCo // ---- FIFO-07/FIFO-08: GetDueJobsAsync partition blocking ---- [Fact] - public async Task GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnOnlyLowestSequenceNumber() + public async Task GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnBothInSequenceOrder() { // Arrange var pk = new PartitionKey("fifo-batch"); @@ -393,9 +393,10 @@ public async Task GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnOnlyLowe // Act var result = await _sut.GetDueJobsAsync(QueueKey.Default, _now, 10, CancellationToken.None); - // Assert — only head of partition returned - result.Should().HaveCount(1); + // Assert — all partition jobs returned in sequence order + result.Should().HaveCount(2); result[0].Id.Should().Be(job1.Id); + result[1].Id.Should().Be(job2.Id); } [Fact] From 5e5852ae3a5e91c0ac2e066ab392d43db4e67ca3 Mon Sep 17 00:00:00 2001 From: mnbuhl Date: Mon, 4 May 2026 23:18:45 +0200 Subject: [PATCH 2/4] refactor: remove narrating comments from EF Core LINQ fallback Co-Authored-By: Claude Sonnet 4.6 --- .../Storage/EntityFrameworkCoreStorage.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs b/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs index 8554c59..0c946ff 100644 --- a/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs +++ b/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs @@ -241,8 +241,6 @@ CancellationToken cancellationToken .Where(j => j.QueueKey == queueKey.Key) .ToListAsync(cancellationToken); - // 1) Collect blocked partitions: any partition key with a Processing job - // or a Pending job with prior attempts (retrying). var blockedPartitions = allForQueue .Where(j => j.PartitionKey != null @@ -254,9 +252,6 @@ CancellationToken cancellationToken .Select(j => j.PartitionKey!) .ToHashSet(); - // 2) Apply eligibility filter, blocking filter, and batch size limit. - // All jobs from unblocked partitions are eligible; sequence ordering - // ensures FIFO delivery within each partition. return allForQueue .Where(j => ( From 883aa26e4812ab99212614d2c98618385e885f85 Mon Sep 17 00:00:00 2001 From: mnbuhl Date: Mon, 4 May 2026 23:23:11 +0200 Subject: [PATCH 3/4] test: widen flaky grace-period timing upper bound to 2.5s 500ms slack over a 1s grace period is too tight for CI. The key invariant is that StopAsync returns well before the 3s worker finishes; 2.5s proves that without being brittle. Co-Authored-By: Claude Sonnet 4.6 --- tests/Atomizer.Tests/Processing/QueuePumpTests.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/Atomizer.Tests/Processing/QueuePumpTests.cs b/tests/Atomizer.Tests/Processing/QueuePumpTests.cs index 50a0653..70d0479 100644 --- a/tests/Atomizer.Tests/Processing/QueuePumpTests.cs +++ b/tests/Atomizer.Tests/Processing/QueuePumpTests.cs @@ -137,12 +137,12 @@ public async Task StopAsync_WhenWorkerIsLongRunning_ShouldRespectGracePeriod() await pump.StopAsync(TimeSpan.FromSeconds(1), CancellationToken.None); sw.Stop(); - // Assert: Should return after about 1 second, not 2 + // Assert: Should return after the grace period (1s), well before the worker finishes (3s) sw.Elapsed.TotalSeconds.Should().BeGreaterThanOrEqualTo(1); sw.Elapsed.TotalSeconds.Should() - .BeLessThanOrEqualTo( - 1.5, - $"StopAsync should return after about 1 second, elapsed: {sw.Elapsed.TotalSeconds}" + .BeLessThan( + 2.5, + $"StopAsync should return after the grace period, not wait for the worker, elapsed: {sw.Elapsed.TotalSeconds}" ); logger.Received(1).LogInformation($"Stopping queue '{queueOptions.QueueKey}'..."); From e1c471be4ca2d6fce681aa019030d5878cbea8b6 Mon Sep 17 00:00:00 2001 From: mnbuhl Date: Mon, 4 May 2026 23:31:00 +0200 Subject: [PATCH 4/4] revert: restore one-job-per-partition FIFO semantics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Returning multiple jobs from the same partition per sweep breaks the documented invariant: a later job can be leased and executed before an earlier one finishes retrying, reordering side effects. All partition-logic changes are reverted. Only the flaky timing test fix (grace-period upper bound 1.5s → 2.5s) remains in this PR. Co-Authored-By: Claude Sonnet 4.6 --- .../Providers/Sql/MySqlDialect.cs | 59 +++++++++++++------ .../Providers/Sql/PostgreSqlDialect.cs | 58 ++++++++++++------ .../Providers/Sql/SqlServerDialect.cs | 56 ++++++++++++------ .../Storage/EntityFrameworkCoreStorage.cs | 26 ++++++-- src/Atomizer/Abstractions/IAtomizerStorage.cs | 3 +- src/Atomizer/Storage/InMemoryStorage.cs | 10 +++- .../AtomizerStorageContractTests.cs | 42 +++---------- .../Storage/InMemoryStorageTests.cs | 7 +-- 8 files changed, 159 insertions(+), 102 deletions(-) diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs index f4a6df3..0db2aca 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs @@ -17,45 +17,66 @@ SELECT DISTINCT {{_jPartitionKey}} {{_jStatus}} = {{_statusProcessing}} OR ({{_jStatus}} = {{_statusPending}} AND {{_jAttempts}} > 0) ) + ), + partition_heads AS ( + SELECT j.{{_jPartitionKey}}, MIN(j.{{_jSequenceNumber}}) AS min_seq + FROM {{_jTable}} AS j + LEFT JOIN blocked_partitions bp ON j.{{_jPartitionKey}} = bp.{{_jPartitionKey}} + WHERE j.{{_jQueueKey}} = {1} + AND j.{{_jPartitionKey}} IS NOT NULL + AND bp.{{_jPartitionKey}} IS NULL + AND ( + (j.{{_jStatus}} = {{_statusPending}} + AND (j.{{_jVisibleAt}} IS NULL OR j.{{_jVisibleAt}} <= {10}) + AND j.{{_jScheduledAt}} <= {11}) + OR (j.{{_jStatus}} = {{_statusProcessing}} AND j.{{_jVisibleAt}} <= {12}) + ) + GROUP BY j.{{_jPartitionKey}} ) SELECT t.* FROM {{_jTable}} AS t - WHERE t.{{_jQueueKey}} = {1} + LEFT JOIN partition_heads ph + ON t.{{_jPartitionKey}} = ph.{{_jPartitionKey}} + AND t.{{_jSequenceNumber}} = ph.min_seq + WHERE t.{{_jQueueKey}} = {2} AND ( (t.{{_jPartitionKey}} IS NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {2}) - AND t.{{_jScheduledAt}} <= {3}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {4}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {3}) + AND t.{{_jScheduledAt}} <= {4}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {5}) ) ) OR - (t.{{_jPartitionKey}} IS NOT NULL - AND t.{{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) + (t.{{_jPartitionKey}} IS NOT NULL AND ph.min_seq IS NOT NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {5}) - AND t.{{_jScheduledAt}} <= {6}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {7}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {6}) + AND t.{{_jScheduledAt}} <= {7}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {8}) ) ) ) - ORDER BY t.{{_jScheduledAt}}, t.{{_jSequenceNumber}}, t.{{_jId}} - LIMIT {8} + ORDER BY t.{{_jScheduledAt}}, t.{{_jId}} + LIMIT {9} FOR UPDATE SKIP LOCKED; """; return FormattableStringFactory.Create( format, queueKey.Key, // {0} blocked_partitions queue filter - queueKey.Key, // {1} outer SELECT queue filter - now, // {2} unpartitioned VisibleAt - now, // {3} unpartitioned ScheduledAt - now, // {4} unpartitioned Processing VisibleAt - now, // {5} partitioned VisibleAt - now, // {6} partitioned ScheduledAt - now, // {7} partitioned Processing VisibleAt - batchSize // {8} LIMIT + queueKey.Key, // {1} partition_heads queue filter + queueKey.Key, // {2} outer SELECT queue filter + now, // {3} unpartitioned VisibleAt + now, // {4} unpartitioned ScheduledAt + now, // {5} unpartitioned Processing VisibleAt + now, // {6} partitioned VisibleAt + now, // {7} partitioned ScheduledAt + now, // {8} partitioned Processing VisibleAt + batchSize, // {9} LIMIT + now, // {10} partition_heads VisibleAt + now, // {11} partition_heads ScheduledAt + now // {12} partition_heads Processing VisibleAt ); } diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs index 22cd2e2..912480e 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs @@ -17,45 +17,65 @@ SELECT DISTINCT {{_jPartitionKey}} {{_jStatus}} = {{_statusProcessing}} OR ({{_jStatus}} = {{_statusPending}} AND {{_jAttempts}} > 0) ) + ), + partition_heads AS ( + SELECT {{_jPartitionKey}}, MIN({{_jSequenceNumber}}) AS min_seq + FROM {{_jTable}} + WHERE {{_jQueueKey}} = {1} + AND {{_jPartitionKey}} IS NOT NULL + AND {{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) + AND ( + ({{_jStatus}} = {{_statusPending}} + AND ({{_jVisibleAt}} IS NULL OR {{_jVisibleAt}} <= {10}) + AND {{_jScheduledAt}} <= {11}) + OR ({{_jStatus}} = {{_statusProcessing}} AND {{_jVisibleAt}} <= {12}) + ) + GROUP BY {{_jPartitionKey}} ) SELECT t.* FROM {{_jTable}} AS t - WHERE t.{{_jQueueKey}} = {1} + LEFT JOIN partition_heads ph + ON t.{{_jPartitionKey}} = ph.{{_jPartitionKey}} + AND t.{{_jSequenceNumber}} = ph.min_seq + WHERE t.{{_jQueueKey}} = {2} AND ( (t.{{_jPartitionKey}} IS NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {2}) - AND t.{{_jScheduledAt}} <= {3}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {4}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {3}) + AND t.{{_jScheduledAt}} <= {4}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {5}) ) ) OR - (t.{{_jPartitionKey}} IS NOT NULL - AND t.{{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) + (t.{{_jPartitionKey}} IS NOT NULL AND ph.min_seq IS NOT NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {5}) - AND t.{{_jScheduledAt}} <= {6}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {7}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {6}) + AND t.{{_jScheduledAt}} <= {7}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {8}) ) ) ) - ORDER BY t.{{_jScheduledAt}}, t.{{_jSequenceNumber}}, t.{{_jId}} - LIMIT {8} + ORDER BY t.{{_jScheduledAt}}, t.{{_jId}} + LIMIT {9} FOR NO KEY UPDATE SKIP LOCKED; """; return FormattableStringFactory.Create( format, queueKey.Key, // {0} blocked_partitions queue filter - queueKey.Key, // {1} outer SELECT queue filter - now, // {2} unpartitioned VisibleAt - now, // {3} unpartitioned ScheduledAt - now, // {4} unpartitioned Processing VisibleAt - now, // {5} partitioned VisibleAt - now, // {6} partitioned ScheduledAt - now, // {7} partitioned Processing VisibleAt - batchSize // {8} LIMIT + queueKey.Key, // {1} partition_heads queue filter + queueKey.Key, // {2} outer SELECT queue filter + now, // {3} unpartitioned VisibleAt + now, // {4} unpartitioned ScheduledAt + now, // {5} unpartitioned Processing VisibleAt + now, // {6} partitioned VisibleAt + now, // {7} partitioned ScheduledAt + now, // {8} partitioned Processing VisibleAt + batchSize, // {9} LIMIT + now, // {10} partition_heads VisibleAt + now, // {11} partition_heads ScheduledAt + now // {12} partition_heads Processing VisibleAt ); } diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs index f09542a..f0092d0 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs @@ -17,42 +17,62 @@ SELECT DISTINCT {{_jPartitionKey}} {{_jStatus}} = {{_statusProcessing}} OR ({{_jStatus}} = {{_statusPending}} AND {{_jAttempts}} > 0) ) + ), + partition_heads AS ( + SELECT {{_jPartitionKey}}, MIN({{_jSequenceNumber}}) AS min_seq + FROM {{_jTable}} + WHERE {{_jQueueKey}} = {1} + AND {{_jPartitionKey}} IS NOT NULL + AND {{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) + AND ( + ({{_jStatus}} = {{_statusPending}} + AND ({{_jVisibleAt}} IS NULL OR {{_jVisibleAt}} <= {9}) + AND {{_jScheduledAt}} <= {10}) + OR ({{_jStatus}} = {{_statusProcessing}} AND {{_jVisibleAt}} <= {11}) + ) + GROUP BY {{_jPartitionKey}} ) SELECT TOP({{batchSize}}) t.* FROM {{_jTable}} AS t WITH (UPDLOCK, READPAST, ROWLOCK) - WHERE t.{{_jQueueKey}} = {1} + LEFT JOIN partition_heads ph + ON t.{{_jPartitionKey}} = ph.{{_jPartitionKey}} + AND t.{{_jSequenceNumber}} = ph.min_seq + WHERE t.{{_jQueueKey}} = {2} AND ( (t.{{_jPartitionKey}} IS NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {2}) - AND t.{{_jScheduledAt}} <= {3}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {4}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {3}) + AND t.{{_jScheduledAt}} <= {4}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {5}) ) ) OR - (t.{{_jPartitionKey}} IS NOT NULL - AND t.{{_jPartitionKey}} NOT IN (SELECT {{_jPartitionKey}} FROM blocked_partitions) + (t.{{_jPartitionKey}} IS NOT NULL AND ph.min_seq IS NOT NULL AND ( (t.{{_jStatus}} = {{_statusPending}} - AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {5}) - AND t.{{_jScheduledAt}} <= {6}) - OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {7}) + AND (t.{{_jVisibleAt}} IS NULL OR t.{{_jVisibleAt}} <= {6}) + AND t.{{_jScheduledAt}} <= {7}) + OR (t.{{_jStatus}} = {{_statusProcessing}} AND t.{{_jVisibleAt}} <= {8}) ) ) ) - ORDER BY t.{{_jScheduledAt}}, t.{{_jSequenceNumber}}, t.{{_jId}}; + ORDER BY t.{{_jScheduledAt}}, t.{{_jId}}; """; return FormattableStringFactory.Create( format, - queueKey.Key, // {0} blocked_partitions queue filter (batchSize is TOP(batchSize) inlined, not a placeholder) - queueKey.Key, // {1} outer SELECT queue filter - now, // {2} unpartitioned VisibleAt - now, // {3} unpartitioned ScheduledAt - now, // {4} unpartitioned Processing VisibleAt - now, // {5} partitioned VisibleAt - now, // {6} partitioned ScheduledAt - now // {7} partitioned Processing VisibleAt + queueKey.Key, // {0} blocked_partitions queue filter + queueKey.Key, // {1} partition_heads queue filter + queueKey.Key, // {2} outer SELECT queue filter + now, // {3} unpartitioned VisibleAt + now, // {4} unpartitioned ScheduledAt + now, // {5} unpartitioned Processing VisibleAt + now, // {6} partitioned VisibleAt + now, // {7} partitioned ScheduledAt + now, // {8} partitioned Processing VisibleAt + now, // {9} partition_heads VisibleAt (batchSize is TOP(batchSize) inlined, not a placeholder) + now, // {10} partition_heads ScheduledAt + now // {11} partition_heads Processing VisibleAt ); } diff --git a/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs b/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs index 0c946ff..4423838 100644 --- a/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs +++ b/src/Atomizer.EntityFrameworkCore/Storage/EntityFrameworkCoreStorage.cs @@ -241,6 +241,8 @@ CancellationToken cancellationToken .Where(j => j.QueueKey == queueKey.Key) .ToListAsync(cancellationToken); + // 1) Collect blocked partitions: any partition key with a Processing job + // or a Pending job with prior attempts (retrying). var blockedPartitions = allForQueue .Where(j => j.PartitionKey != null @@ -252,18 +254,32 @@ CancellationToken cancellationToken .Select(j => j.PartitionKey!) .ToHashSet(); + // 2) Find the lowest sequence number per unblocked partition (partition heads). + // Only consider Pending jobs that are due — Completed and Failed jobs must not + // block the next job from becoming the partition head. + var partitionHeads = allForQueue + .Where(j => + j.PartitionKey != null + && !blockedPartitions.Contains(j.PartitionKey) + && j.Status == AtomizerEntityJobStatus.Pending + && (j.VisibleAt == null || j.VisibleAt <= now) + && j.ScheduledAt <= now + ) + .GroupBy(j => j.PartitionKey!) + .Select(g => g.OrderBy(j => j.SequenceNumber).First().Id) + .ToHashSet(); + + // 3) Apply eligibility filter, FIFO partition-head filter, and batch size limit. return allForQueue .Where(j => ( - ( - j.Status == AtomizerEntityJobStatus.Pending + j.Status == AtomizerEntityJobStatus.Pending && (j.VisibleAt == null || j.VisibleAt <= now) && j.ScheduledAt <= now - ) || (j.Status == AtomizerEntityJobStatus.Processing && j.VisibleAt <= now) // lease expired - ) && (j.PartitionKey == null || !blockedPartitions.Contains(j.PartitionKey)) + || (j.Status == AtomizerEntityJobStatus.Processing && j.VisibleAt <= now) // lease expired + ) && (j.PartitionKey == null || partitionHeads.Contains(j.Id)) ) .OrderBy(j => j.ScheduledAt) - .ThenBy(j => j.SequenceNumber) .Take(batchSize) .Select(j => j.ToAtomizerJob()) .ToList(); diff --git a/src/Atomizer/Abstractions/IAtomizerStorage.cs b/src/Atomizer/Abstractions/IAtomizerStorage.cs index 610a130..52868c1 100644 --- a/src/Atomizer/Abstractions/IAtomizerStorage.cs +++ b/src/Atomizer/Abstractions/IAtomizerStorage.cs @@ -39,10 +39,9 @@ public interface IAtomizerStorage /// /// When partition keys are in use, this method enforces FIFO ordering: /// + /// At most one job per (queue, partition key) is returned — the job with the lowest sequence number. /// A partition is excluded entirely if any job within it is /// or with prior attempts (Attempts > 0). - /// All eligible jobs from unblocked partitions are returned (up to ), - /// allowing multiple jobs from the same partition to be leased in a single sweep for higher throughput. /// Jobs without a partition key are unaffected and returned normally alongside partitioned jobs. /// /// diff --git a/src/Atomizer/Storage/InMemoryStorage.cs b/src/Atomizer/Storage/InMemoryStorage.cs index 49d8d56..a749bbe 100644 --- a/src/Atomizer/Storage/InMemoryStorage.cs +++ b/src/Atomizer/Storage/InMemoryStorage.cs @@ -145,9 +145,15 @@ CancellationToken cancellationToken ) .Select(j => j!); - var candidates = eligible + var unpartitioned = eligible.Where(j => j.PartitionKey == null); + var partitionHeads = eligible + .Where(j => j.PartitionKey != null) + .GroupBy(j => j.PartitionKey!.Key) + .Select(g => g.OrderBy(j => j.SequenceNumber).First()); + + var candidates = unpartitioned + .Concat(partitionHeads) .OrderBy(j => j.ScheduledAt) - .ThenBy(j => j.SequenceNumber) .ThenBy(j => j.CreatedAt) .Take(Math.Max(0, batchSize)) .ToList(); diff --git a/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs b/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs index edcee19..d55c2fb 100644 --- a/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs +++ b/tests/Atomizer.Tests.Utilities/StorageContract/AtomizerStorageContractTests.cs @@ -108,15 +108,15 @@ public async Task InsertAsync_WhenIdempotencyKeyCollision_ShouldAssignExistingSe } // ------------------------------------------------------------------ - // FIFO-07: GetDueJobsAsync returns all eligible jobs from unblocked partitions + // FIFO-07: GetDueJobsAsync returns at most one job per partition // ------------------------------------------------------------------ /// - /// FIFO-07: When multiple jobs share a partition key, all of them are returned in - /// sequence-number order so a single sweep can process the whole partition. + /// FIFO-07: When multiple jobs share a partition key, only the lowest-sequence-number + /// job is returned by . /// [Fact] - public async Task GetDueJobsAsync_WhenMultipleJobsInSamePartition_ShouldReturnAllInSequenceOrder() + public async Task GetDueJobsAsync_WhenMultipleJobsInSamePartition_ShouldReturnOnlyLowestSequenceNumber() { // Arrange var partitionKey = new PartitionKey("batch-key"); @@ -129,32 +129,9 @@ public async Task GetDueJobsAsync_WhenMultipleJobsInSamePartition_ShouldReturnAl // Act var result = await _sut.GetDueJobsAsync(QueueKey.Default, _now, batchSize: 10, CancellationToken.None); - // Assert — both jobs are returned; job1 (lower sequence) comes first - result.Should().HaveCount(2); - result[0].Id.Should().Be(job1.Id); - result[1].Id.Should().Be(job2.Id); - } - - /// - /// FIFO-07: The batch size cap applies across all eligible jobs regardless of partition. - /// - [Fact] - public async Task GetDueJobsAsync_WhenBatchSizeSmallerThanPartitionJobs_ShouldRespectBatchSize() - { - // Arrange - var partitionKey = new PartitionKey("big-partition"); - var jobs = Enumerable.Range(0, 5).Select(_ => CreateJob(partitionKey: partitionKey)).ToList(); - foreach (var job in jobs) - await _sut.InsertAsync(job, CancellationToken.None); - - // Act - var result = await _sut.GetDueJobsAsync(QueueKey.Default, _now, batchSize: 3, CancellationToken.None); - - // Assert — capped at batchSize, lowest sequence numbers first - result.Should().HaveCount(3); - result[0].Id.Should().Be(jobs[0].Id); - result[1].Id.Should().Be(jobs[1].Id); - result[2].Id.Should().Be(jobs[2].Id); + // Assert + result.Should().HaveCount(1); + result.Single().Id.Should().Be(job1.Id); } /// @@ -290,9 +267,8 @@ public async Task ReleaseLeasedAsync_WhenPartitionHeadReleased_ShouldUnblockPart await _sut.ReleaseLeasedAsync(leaseToken, _now, CancellationToken.None); var unblocked = await _sut.GetDueJobsAsync(QueueKey.Default, _now, batchSize: 10, CancellationToken.None); - unblocked.Should().HaveCount(2); - unblocked[0].Id.Should().Be(job1.Id); - unblocked[1].Id.Should().Be(job2.Id); + unblocked.Should().HaveCount(1); + unblocked.Single().Id.Should().Be(job1.Id); } // ------------------------------------------------------------------ diff --git a/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs b/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs index a6e9af6..81cbf39 100644 --- a/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs +++ b/tests/Atomizer.Tests/Storage/InMemoryStorageTests.cs @@ -381,7 +381,7 @@ public async Task InsertAsync_WhenIdempotencyKeyCollision_ShouldNotIncreaseJobCo // ---- FIFO-07/FIFO-08: GetDueJobsAsync partition blocking ---- [Fact] - public async Task GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnBothInSequenceOrder() + public async Task GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnOnlyLowestSequenceNumber() { // Arrange var pk = new PartitionKey("fifo-batch"); @@ -393,10 +393,9 @@ public async Task GetDueJobsAsync_WhenTwoJobsSharePartition_ShouldReturnBothInSe // Act var result = await _sut.GetDueJobsAsync(QueueKey.Default, _now, 10, CancellationToken.None); - // Assert — all partition jobs returned in sequence order - result.Should().HaveCount(2); + // Assert — only head of partition returned + result.Should().HaveCount(1); result[0].Id.Should().Be(job1.Id); - result[1].Id.Should().Be(job2.Id); } [Fact]