diff --git a/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.Designer.cs b/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.Designer.cs index c9033f0..63a5ccb 100644 --- a/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.Designer.cs +++ b/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.Designer.cs @@ -186,6 +186,10 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("tinyint(1)"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("varchar(255)"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.cs b/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.cs index 384ae7a..4dc5ffd 100644 --- a/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.cs +++ b/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/20260504195004_Initial.cs @@ -104,6 +104,9 @@ protected override void Up(MigrationBuilder migrationBuilder) MisfirePolicy = table.Column(type: "int", nullable: false), MaxCatchUp = table.Column(type: "int", nullable: false), Enabled = table.Column(type: "tinyint(1)", nullable: false), + PartitionKey = table + .Column(type: "varchar(255)", maxLength: 255, nullable: true) + .Annotation("MySql:CharSet", "utf8mb4"), RetryIntervals = table .Column(type: "varchar(4096)", maxLength: 4096, nullable: false) .Annotation("MySql:CharSet", "utf8mb4"), diff --git a/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/ExampleMySqlContextModelSnapshot.cs b/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/ExampleMySqlContextModelSnapshot.cs index 0143624..21aa76e 100644 --- a/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/ExampleMySqlContextModelSnapshot.cs +++ b/samples/Atomizer.EFCore.Example/Data/MySql/Migrations/ExampleMySqlContextModelSnapshot.cs @@ -183,6 +183,10 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("tinyint(1)"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("varchar(255)"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.Designer.cs b/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.Designer.cs index dc93dc7..a8c62e7 100644 --- a/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.Designer.cs +++ b/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.Designer.cs @@ -186,6 +186,10 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("boolean"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("character varying(255)"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.cs b/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.cs index 744ac0f..0390093 100644 --- a/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.cs +++ b/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/20260504194951_Initial.cs @@ -92,6 +92,7 @@ protected override void Up(MigrationBuilder migrationBuilder) MisfirePolicy = table.Column(type: "integer", nullable: false), MaxCatchUp = table.Column(type: "integer", nullable: false), Enabled = table.Column(type: "boolean", nullable: false), + PartitionKey = table.Column(type: "character varying(255)", maxLength: 255, nullable: true), RetryIntervals = table.Column( type: "character varying(4096)", maxLength: 4096, diff --git a/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/ExamplePostgresContextModelSnapshot.cs b/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/ExamplePostgresContextModelSnapshot.cs index 83a8336..d50726f 100644 --- a/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/ExamplePostgresContextModelSnapshot.cs +++ b/samples/Atomizer.EFCore.Example/Data/Postgres/Migrations/ExamplePostgresContextModelSnapshot.cs @@ -183,6 +183,10 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("boolean"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("character varying(255)"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.Designer.cs b/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.Designer.cs index 856df16..a2172ab 100644 --- a/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.Designer.cs +++ b/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.Designer.cs @@ -186,6 +186,10 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("bit"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("nvarchar(255)"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.cs b/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.cs index 97e8cec..097f394 100644 --- a/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.cs +++ b/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/20260504194956_Initial.cs @@ -72,6 +72,7 @@ protected override void Up(MigrationBuilder migrationBuilder) MisfirePolicy = table.Column(type: "int", nullable: false), MaxCatchUp = table.Column(type: "int", nullable: false), Enabled = table.Column(type: "bit", nullable: false), + PartitionKey = table.Column(type: "nvarchar(255)", maxLength: 255, nullable: true), RetryIntervals = table.Column(type: "nvarchar(max)", maxLength: 4096, nullable: false), NextRunAt = table.Column(type: "datetimeoffset", nullable: false), LastEnqueueAt = table.Column(type: "datetimeoffset", nullable: true), diff --git a/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/ExampleSqlServerContextModelSnapshot.cs b/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/ExampleSqlServerContextModelSnapshot.cs index 5410636..9571a29 100644 --- a/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/ExampleSqlServerContextModelSnapshot.cs +++ b/samples/Atomizer.EFCore.Example/Data/SqlServer/Migrations/ExampleSqlServerContextModelSnapshot.cs @@ -183,6 +183,10 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("bit"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("nvarchar(255)"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.Designer.cs b/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.Designer.cs index f7e59b1..3b15222 100644 --- a/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.Designer.cs +++ b/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.Designer.cs @@ -181,6 +181,10 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("INTEGER"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("TEXT"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.cs b/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.cs index abe01b3..ec391d1 100644 --- a/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.cs +++ b/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/20260504195000_Initial.cs @@ -72,6 +72,7 @@ protected override void Up(MigrationBuilder migrationBuilder) MisfirePolicy = table.Column(type: "INTEGER", nullable: false), MaxCatchUp = table.Column(type: "INTEGER", nullable: false), Enabled = table.Column(type: "INTEGER", nullable: false), + PartitionKey = table.Column(type: "TEXT", maxLength: 255, nullable: true), RetryIntervals = table.Column(type: "TEXT", maxLength: 4096, nullable: false), NextRunAt = table.Column(type: "TEXT", nullable: false), LastEnqueueAt = table.Column(type: "TEXT", nullable: true), diff --git a/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/ExampleSqliteContextModelSnapshot.cs b/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/ExampleSqliteContextModelSnapshot.cs index 6e28db1..e0a32cf 100644 --- a/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/ExampleSqliteContextModelSnapshot.cs +++ b/samples/Atomizer.EFCore.Example/Data/Sqlite/Migrations/ExampleSqliteContextModelSnapshot.cs @@ -178,6 +178,10 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.Property("Enabled") .HasColumnType("INTEGER"); + b.Property("PartitionKey") + .HasMaxLength(255) + .HasColumnType("TEXT"); + b.Property("JobKey") .IsRequired() .HasMaxLength(255) diff --git a/samples/Atomizer.EFCore.Example/Program.cs b/samples/Atomizer.EFCore.Example/Program.cs index a7ff760..8d3dd0b 100644 --- a/samples/Atomizer.EFCore.Example/Program.cs +++ b/samples/Atomizer.EFCore.Example/Program.cs @@ -87,7 +87,11 @@ await atomizer.ScheduleRecurringAsync( new LoggerJobPayload("Recurring job started", LogLevel.Information), "LoggerJobCatchUp", Schedule.Cron("0/5 * * * * *"), // Every 5 seconds, - options => options.MisfirePolicy = MisfirePolicy.CatchUp + options => + { + options.MisfirePolicy = MisfirePolicy.CatchUp; + options.PartitionKey = new PartitionKey("LoggerJobCatchUp"); + } ); app.MapPost( diff --git a/src/Atomizer.EntityFrameworkCore/Configurations/AtomizerScheduleEntityConfiguration.cs b/src/Atomizer.EntityFrameworkCore/Configurations/AtomizerScheduleEntityConfiguration.cs index 6ca7c55..9667576 100644 --- a/src/Atomizer.EntityFrameworkCore/Configurations/AtomizerScheduleEntityConfiguration.cs +++ b/src/Atomizer.EntityFrameworkCore/Configurations/AtomizerScheduleEntityConfiguration.cs @@ -39,6 +39,7 @@ public void Configure(EntityTypeBuilder builder) builder.Property(e => e.MisfirePolicy).IsRequired(); builder.Property(e => e.MaxCatchUp).IsRequired(); builder.Property(e => e.Enabled).IsRequired(); + builder.Property(e => e.PartitionKey).HasMaxLength(255).IsRequired(false); builder.Property(e => e.NextRunAt).IsRequired(); builder.Property(e => e.LastEnqueueAt); builder.Property(e => e.CreatedAt).IsRequired(); diff --git a/src/Atomizer.EntityFrameworkCore/Entities/AtomizerScheduleEntity.cs b/src/Atomizer.EntityFrameworkCore/Entities/AtomizerScheduleEntity.cs index ebcbf86..413d69c 100644 --- a/src/Atomizer.EntityFrameworkCore/Entities/AtomizerScheduleEntity.cs +++ b/src/Atomizer.EntityFrameworkCore/Entities/AtomizerScheduleEntity.cs @@ -35,6 +35,9 @@ public class AtomizerScheduleEntity /// Gets or sets whether this schedule is currently active. public bool Enabled { get; set; } = true; + /// Gets or sets the partition key forwarded to generated jobs, if any. + public string? PartitionKey { get; set; } + /// Gets or sets the serialized retry intervals for generated jobs. public TimeSpan[] RetryIntervals { get; set; } = []; @@ -90,6 +93,7 @@ public static AtomizerScheduleEntity ToEntity(this AtomizerSchedule schedule) MisfirePolicy = (MisfirePolicyEntity)(int)schedule.MisfirePolicy, MaxCatchUp = schedule.MaxCatchUp, Enabled = schedule.Enabled, + PartitionKey = schedule.PartitionKey?.Key, RetryIntervals = schedule.RetryStrategy.RetryIntervals, NextRunAt = schedule.NextRunAt, LastEnqueueAt = schedule.LastEnqueueAt, @@ -117,6 +121,7 @@ public static AtomizerSchedule ToAtomizerSchedule(this AtomizerScheduleEntity en MisfirePolicy = (MisfirePolicy)(int)entity.MisfirePolicy, MaxCatchUp = entity.MaxCatchUp, Enabled = entity.Enabled, + PartitionKey = entity.PartitionKey is null ? null : new PartitionKey(entity.PartitionKey), RetryStrategy = entity.RetryIntervals.Length == 0 ? RetryStrategy.None : RetryStrategy.Intervals(entity.RetryIntervals), NextRunAt = entity.NextRunAt, diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/BaseSqlDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/BaseSqlDialect.cs index 701782b..134ce4f 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/BaseSqlDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/BaseSqlDialect.cs @@ -36,6 +36,7 @@ internal abstract class BaseSqlDialect : ISqlDialect protected readonly string _sMisfirePolicy; protected readonly string _sMaxCatchUp; protected readonly string _sEnabled; + protected readonly string _sPartitionKey; protected readonly string _sRetryIntervals; protected readonly string _sNextRunAt; protected readonly string _sLastEnqueueAt; @@ -78,6 +79,7 @@ protected BaseSqlDialect(EntityMap jobs, EntityMap schedules) _sMisfirePolicy = sc[nameof(AtomizerScheduleEntity.MisfirePolicy)]; _sMaxCatchUp = sc[nameof(AtomizerScheduleEntity.MaxCatchUp)]; _sEnabled = sc[nameof(AtomizerScheduleEntity.Enabled)]; + _sPartitionKey = sc[nameof(AtomizerScheduleEntity.PartitionKey)]; _sRetryIntervals = sc[nameof(AtomizerScheduleEntity.RetryIntervals)]; _sNextRunAt = sc[nameof(AtomizerScheduleEntity.NextRunAt)]; _sLastEnqueueAt = sc[nameof(AtomizerScheduleEntity.LastEnqueueAt)]; diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs index 28ee4f9..0db2aca 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/MySqlDialect.cs @@ -150,6 +150,7 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date {{_sMisfirePolicy}}, {{_sMaxCatchUp}}, {{_sEnabled}}, + {{_sPartitionKey}}, {{_sRetryIntervals}}, {{_sNextRunAt}}, {{_sLastEnqueueAt}}, @@ -170,7 +171,8 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date {11}, {12}, {13}, - {14} + {14}, + {15} ) ON DUPLICATE KEY UPDATE {{_sQueueKey}} = VALUES({{_sQueueKey}}), @@ -181,9 +183,10 @@ ON DUPLICATE KEY UPDATE {{_sMisfirePolicy}} = VALUES({{_sMisfirePolicy}}), {{_sMaxCatchUp}} = VALUES({{_sMaxCatchUp}}), {{_sEnabled}} = VALUES({{_sEnabled}}), + {{_sPartitionKey}} = VALUES({{_sPartitionKey}}), {{_sRetryIntervals}} = VALUES({{_sRetryIntervals}}), {{_sNextRunAt}} = VALUES({{_sNextRunAt}}), - {{_sUpdatedAt}} = {14}; + {{_sUpdatedAt}} = {15}; """; return FormattableStringFactory.Create( format, @@ -197,6 +200,7 @@ ON DUPLICATE KEY UPDATE (int)entity.MisfirePolicy, entity.MaxCatchUp, entity.Enabled, + entity.PartitionKey, retryIntervals, entity.NextRunAt, entity.LastEnqueueAt, diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs index ab7062f..912480e 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/PostgreSqlDialect.cs @@ -149,6 +149,7 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date {{_sMisfirePolicy}}, {{_sMaxCatchUp}}, {{_sEnabled}}, + {{_sPartitionKey}}, {{_sRetryIntervals}}, {{_sNextRunAt}}, {{_sLastEnqueueAt}}, @@ -169,7 +170,8 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date {11}, {12}, {13}, - {14} + {14}, + {15} ) ON CONFLICT ({{_sJobKey}}) DO UPDATE SET {{_sQueueKey}} = EXCLUDED.{{_sQueueKey}}, @@ -180,6 +182,7 @@ ON CONFLICT ({{_sJobKey}}) DO UPDATE SET {{_sMisfirePolicy}} = EXCLUDED.{{_sMisfirePolicy}}, {{_sMaxCatchUp}} = EXCLUDED.{{_sMaxCatchUp}}, {{_sEnabled}} = EXCLUDED.{{_sEnabled}}, + {{_sPartitionKey}} = EXCLUDED.{{_sPartitionKey}}, {{_sRetryIntervals}} = EXCLUDED.{{_sRetryIntervals}}, {{_sNextRunAt}} = EXCLUDED.{{_sNextRunAt}}, {{_sUpdatedAt}} = EXCLUDED.{{_sUpdatedAt}}; @@ -196,6 +199,7 @@ ON CONFLICT ({{_sJobKey}}) DO UPDATE SET (int)entity.MisfirePolicy, entity.MaxCatchUp, entity.Enabled, + entity.PartitionKey, retryIntervals, entity.NextRunAt, entity.LastEnqueueAt, diff --git a/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs b/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs index b0ee014..f0092d0 100644 --- a/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs +++ b/src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs @@ -146,9 +146,10 @@ WHEN MATCHED THEN UPDATE SET {{_sMisfirePolicy}} = {6}, {{_sMaxCatchUp}} = {7}, {{_sEnabled}} = {8}, - {{_sRetryIntervals}} = {9}, - {{_sNextRunAt}} = {10}, - {{_sUpdatedAt}} = {11} + {{_sPartitionKey}} = {9}, + {{_sRetryIntervals}} = {10}, + {{_sNextRunAt}} = {11}, + {{_sUpdatedAt}} = {12} WHEN NOT MATCHED THEN INSERT ( {{_sId}}, {{_sJobKey}}, @@ -160,13 +161,14 @@ WHEN NOT MATCHED THEN INSERT ( {{_sMisfirePolicy}}, {{_sMaxCatchUp}}, {{_sEnabled}}, + {{_sPartitionKey}}, {{_sRetryIntervals}}, {{_sNextRunAt}}, {{_sLastEnqueueAt}}, {{_sCreatedAt}}, {{_sUpdatedAt}} ) VALUES ( - {12}, + {13}, {0}, {1}, {2}, @@ -178,9 +180,10 @@ WHEN NOT MATCHED THEN INSERT ( {8}, {9}, {10}, - {13}, + {11}, {14}, - {11} + {15}, + {12} ); """; return FormattableStringFactory.Create( @@ -194,12 +197,13 @@ WHEN NOT MATCHED THEN INSERT ( (int)entity.MisfirePolicy, // {6} entity.MaxCatchUp, // {7} entity.Enabled ? 1 : 0, // {8} - retryIntervals, // {9} - entity.NextRunAt, // {10} - now, // {11} - entity.Id, // {12} - entity.LastEnqueueAt, // {13} - entity.CreatedAt // {14} + entity.PartitionKey, // {9} + retryIntervals, // {10} + entity.NextRunAt, // {11} + now, // {12} + entity.Id, // {13} + entity.LastEnqueueAt, // {14} + entity.CreatedAt // {15} ); } } diff --git a/tests/Atomizer.EntityFrameworkCore.Tests/Providers/MySqlDialectTests.cs b/tests/Atomizer.EntityFrameworkCore.Tests/Providers/MySqlDialectTests.cs index 9aefd83..e82dd8e 100644 --- a/tests/Atomizer.EntityFrameworkCore.Tests/Providers/MySqlDialectTests.cs +++ b/tests/Atomizer.EntityFrameworkCore.Tests/Providers/MySqlDialectTests.cs @@ -76,5 +76,6 @@ public void UpsertSchedule_WhenCalled_ShouldContainOnDuplicateKeyUpdate() var sql = dialect.UpsertSchedule(schedule, DateTimeOffset.UtcNow); sql.Format.Should().Contain("ON DUPLICATE KEY UPDATE"); + sql.Format.Should().Contain("PartitionKey"); } } diff --git a/tests/Atomizer.EntityFrameworkCore.Tests/Providers/PostgreSqlDialectTests.cs b/tests/Atomizer.EntityFrameworkCore.Tests/Providers/PostgreSqlDialectTests.cs index 35bcd51..187807a 100644 --- a/tests/Atomizer.EntityFrameworkCore.Tests/Providers/PostgreSqlDialectTests.cs +++ b/tests/Atomizer.EntityFrameworkCore.Tests/Providers/PostgreSqlDialectTests.cs @@ -78,5 +78,6 @@ public void UpsertSchedule_WhenCalled_ShouldContainOnConflict() sql.Format.Should().Contain("ON CONFLICT"); sql.Format.Should().Contain("DO UPDATE SET"); + sql.Format.Should().Contain("PartitionKey"); } } diff --git a/tests/Atomizer.EntityFrameworkCore.Tests/Providers/SqlServerDialectTests.cs b/tests/Atomizer.EntityFrameworkCore.Tests/Providers/SqlServerDialectTests.cs index d86c308..ebd5d04 100644 --- a/tests/Atomizer.EntityFrameworkCore.Tests/Providers/SqlServerDialectTests.cs +++ b/tests/Atomizer.EntityFrameworkCore.Tests/Providers/SqlServerDialectTests.cs @@ -78,5 +78,6 @@ public void UpsertSchedule_WhenCalled_ShouldContainMergeWithHoldlock() sql.Format.Should().Contain("MERGE"); sql.Format.Should().Contain("WITH (HOLDLOCK)"); + sql.Format.Should().Contain("PartitionKey"); } } diff --git a/tests/Atomizer.EntityFrameworkCore.Tests/Storage/EntityFrameworkCoreStorageTests.cs b/tests/Atomizer.EntityFrameworkCore.Tests/Storage/EntityFrameworkCoreStorageTests.cs index 311aaa9..149ef5c 100644 --- a/tests/Atomizer.EntityFrameworkCore.Tests/Storage/EntityFrameworkCoreStorageTests.cs +++ b/tests/Atomizer.EntityFrameworkCore.Tests/Storage/EntityFrameworkCoreStorageTests.cs @@ -566,7 +566,8 @@ public async Task UpsertScheduleAsync_WhenScheduleDoesNotExist_ShouldInsertSched """{ "message": "New Schedule" }""", Schedule.EveryMinute, TimeZoneInfo.Utc, - now + now, + partitionKey: new PartitionKey("scheduled-partition") ); await using var dbContext = _dbContextFactory(); @@ -583,9 +584,11 @@ public async Task UpsertScheduleAsync_WhenScheduleDoesNotExist_ShouldInsertSched insertedScheduleEntity.Should().NotBeNull(); insertedScheduleEntity.JobKey.Should().Be(schedule.JobKey); insertedScheduleEntity.Payload.Should().Be(schedule.Payload); + insertedScheduleEntity.PartitionKey.Should().Be("scheduled-partition"); var map = () => insertedScheduleEntity.ToAtomizerSchedule(); map.Should().NotThrow(); + insertedScheduleEntity.ToAtomizerSchedule().PartitionKey.Should().Be(schedule.PartitionKey); } [Fact] @@ -654,6 +657,7 @@ public async Task UpsertScheduleAsync_WhenScheduleExists_ShouldUpdateSchedule() // Act schedule.Payload = """{ "message": "Updated Schedule" }"""; + schedule.PartitionKey = new PartitionKey("updated-partition"); var scheduleId = await storage.UpsertScheduleAsync(schedule, CancellationToken.None); var updatedScheduleEntity = await dbContext .Set() @@ -663,9 +667,11 @@ public async Task UpsertScheduleAsync_WhenScheduleExists_ShouldUpdateSchedule() scheduleId.Should().Be(schedule.Id); updatedScheduleEntity.Should().NotBeNull(); updatedScheduleEntity.Payload.Should().Be("""{ "message": "Updated Schedule" }"""); + updatedScheduleEntity.PartitionKey.Should().Be("updated-partition"); var map = () => updatedScheduleEntity.ToAtomizerSchedule(); map.Should().NotThrow(); + updatedScheduleEntity.ToAtomizerSchedule().PartitionKey.Should().Be(schedule.PartitionKey); } [Fact] diff --git a/tests/Atomizer.Tests/Processing/JobWorkerTests.cs b/tests/Atomizer.Tests/Processing/JobWorkerTests.cs index 6716c5a..3a2c40a 100644 --- a/tests/Atomizer.Tests/Processing/JobWorkerTests.cs +++ b/tests/Atomizer.Tests/Processing/JobWorkerTests.cs @@ -135,8 +135,9 @@ public async Task RunAsync_WhenReaderThrowsUnexpected_ShouldLogAndContinueToNext var ioCts = new CancellationTokenSource(); var executionCts = new CancellationTokenSource(); - var nextJob = _job; // the job that should be processed after the read failure - var reader = new ThrowThenReturnReader(nextJob); + var maxReadAttempts = NonPublicSpy.GetConstant("MaxReadAttempts"); + var nextJob = _job; // the job that should be processed after the read failures are skipped + var reader = new ThrowThenReturnReader(nextJob, maxReadAttempts); var processor = Substitute.For(); var processedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -164,7 +165,6 @@ public async Task RunAsync_WhenReaderThrowsUnexpected_ShouldLogAndContinueToNext (await WaitOrTimeout(run, Timeout)).Should().BeTrue(); // Assert - var maxReadAttempts = NonPublicSpy.GetConstant("MaxReadAttempts"); _logger .Received(maxReadAttempts) .LogWarning(Arg.Any(), $"Worker {_workerId} channel read operation failed"); @@ -286,34 +286,36 @@ private static async Task WaitOrTimeout(Task task, TimeSpan timeout) private sealed class ThrowThenReturnReader : ChannelReader { private readonly AtomizerJob _next; - private int _reads; + private readonly int _failuresBeforeJob; + private int _readAttempts; + private int _returned; - private readonly List _items = new(); - - public ThrowThenReturnReader(AtomizerJob next) + public ThrowThenReturnReader(AtomizerJob next, int failuresBeforeJob) { _next = next; - - _items.Add(new Exception("unexpected read error")); - _items.Add(next); + _failuresBeforeJob = failuresBeforeJob; } public override async ValueTask ReadAsync(CancellationToken cancellationToken = default) { - cancellationToken.ThrowIfCancellationRequested(); + var attempt = Interlocked.Increment(ref _readAttempts); - await Task.Delay(50, cancellationToken); // simulate some delay + if (attempt <= _failuresBeforeJob) + { + throw new InvalidOperationException("unexpected read error"); + } - return _items[_reads] switch + if (Interlocked.Exchange(ref _returned, 1) == 0) { - Exception ex => throw ex, - _ => _next, - }; + return _next; + } + + await Task.Delay(System.Threading.Timeout.InfiniteTimeSpan, cancellationToken); + throw new OperationCanceledException(cancellationToken); } public override bool TryRead(out AtomizerJob item) { - Interlocked.Increment(ref _reads); item = default!; return false; } diff --git a/tests/Atomizer.Tests/Scheduling/ScheduleProcessorTests.cs b/tests/Atomizer.Tests/Scheduling/ScheduleProcessorTests.cs index 507ef74..c5f4d2a 100644 --- a/tests/Atomizer.Tests/Scheduling/ScheduleProcessorTests.cs +++ b/tests/Atomizer.Tests/Scheduling/ScheduleProcessorTests.cs @@ -58,6 +58,36 @@ await _storage } } + [Fact] + public async Task ProcessAsync_WhenScheduleHasPartitionKey_ShouldInsertJobWithPartitionKey() + { + // Arrange + var partitionKey = new PartitionKey("scheduled-partition"); + var schedule = AtomizerSchedule.Create( + new JobKey("testjob"), + QueueKey.Default, + typeof(WriteLineMessage), + "payload", + Schedule.EverySecond, + TimeZoneInfo.Utc, + _clock.UtcNow, + partitionKey: partitionKey + ); + var horizon = _clock.UtcNow.AddSeconds(1); + var token = CancellationToken.None; + + // Act + await _sut.ProcessAsync(schedule, horizon, token); + + // Assert + await _storage + .Received() + .InsertAsync( + Arg.Is(j => j.ScheduleJobKey == schedule.JobKey && j.PartitionKey == partitionKey), + token + ); + } + [Fact] public async Task ProcessAsync_WhenInsertJobThrows_ShouldLogErrorAndContinue() {