Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ protected override void Up(MigrationBuilder migrationBuilder)
MisfirePolicy = table.Column<int>(type: "int", nullable: false),
MaxCatchUp = table.Column<int>(type: "int", nullable: false),
Enabled = table.Column<bool>(type: "tinyint(1)", nullable: false),
PartitionKey = table
.Column<string>(type: "varchar(255)", maxLength: 255, nullable: true)
.Annotation("MySql:CharSet", "utf8mb4"),
RetryIntervals = table
.Column<string>(type: "varchar(4096)", maxLength: 4096, nullable: false)
.Annotation("MySql:CharSet", "utf8mb4"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ protected override void BuildModel(ModelBuilder modelBuilder)
b.Property<bool>("Enabled")
.HasColumnType("tinyint(1)");

b.Property<string>("PartitionKey")
.HasMaxLength(255)
.HasColumnType("varchar(255)");

b.Property<string>("JobKey")
.IsRequired()
.HasMaxLength(255)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected override void Up(MigrationBuilder migrationBuilder)
MisfirePolicy = table.Column<int>(type: "integer", nullable: false),
MaxCatchUp = table.Column<int>(type: "integer", nullable: false),
Enabled = table.Column<bool>(type: "boolean", nullable: false),
PartitionKey = table.Column<string>(type: "character varying(255)", maxLength: 255, nullable: true),
RetryIntervals = table.Column<string>(
type: "character varying(4096)",
maxLength: 4096,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ protected override void BuildModel(ModelBuilder modelBuilder)
b.Property<bool>("Enabled")
.HasColumnType("boolean");

b.Property<string>("PartitionKey")
.HasMaxLength(255)
.HasColumnType("character varying(255)");

b.Property<string>("JobKey")
.IsRequired()
.HasMaxLength(255)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected override void Up(MigrationBuilder migrationBuilder)
MisfirePolicy = table.Column<int>(type: "int", nullable: false),
MaxCatchUp = table.Column<int>(type: "int", nullable: false),
Enabled = table.Column<bool>(type: "bit", nullable: false),
PartitionKey = table.Column<string>(type: "nvarchar(255)", maxLength: 255, nullable: true),
RetryIntervals = table.Column<string>(type: "nvarchar(max)", maxLength: 4096, nullable: false),
NextRunAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: false),
LastEnqueueAt = table.Column<DateTimeOffset>(type: "datetimeoffset", nullable: true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ protected override void BuildModel(ModelBuilder modelBuilder)
b.Property<bool>("Enabled")
.HasColumnType("bit");

b.Property<string>("PartitionKey")
.HasMaxLength(255)
.HasColumnType("nvarchar(255)");

b.Property<string>("JobKey")
.IsRequired()
.HasMaxLength(255)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected override void Up(MigrationBuilder migrationBuilder)
MisfirePolicy = table.Column<int>(type: "INTEGER", nullable: false),
MaxCatchUp = table.Column<int>(type: "INTEGER", nullable: false),
Enabled = table.Column<bool>(type: "INTEGER", nullable: false),
PartitionKey = table.Column<string>(type: "TEXT", maxLength: 255, nullable: true),
RetryIntervals = table.Column<string>(type: "TEXT", maxLength: 4096, nullable: false),
NextRunAt = table.Column<DateTimeOffset>(type: "TEXT", nullable: false),
LastEnqueueAt = table.Column<DateTimeOffset>(type: "TEXT", nullable: true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ protected override void BuildModel(ModelBuilder modelBuilder)
b.Property<bool>("Enabled")
.HasColumnType("INTEGER");

b.Property<string>("PartitionKey")
.HasMaxLength(255)
.HasColumnType("TEXT");

b.Property<string>("JobKey")
.IsRequired()
.HasMaxLength(255)
Expand Down
6 changes: 5 additions & 1 deletion samples/Atomizer.EFCore.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ await atomizer.ScheduleRecurringAsync(
new LoggerJobPayload("Recurring job started", LogLevel.Information),
"LoggerJobCatchUp",
Schedule.Cron("0/5 * * * * *"), // Every 5 seconds,
options => options.MisfirePolicy = MisfirePolicy.CatchUp
options =>
{
options.MisfirePolicy = MisfirePolicy.CatchUp;
options.PartitionKey = new PartitionKey("LoggerJobCatchUp");
}
);

app.MapPost(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public void Configure(EntityTypeBuilder<AtomizerScheduleEntity> builder)
builder.Property(e => e.MisfirePolicy).IsRequired();
builder.Property(e => e.MaxCatchUp).IsRequired();
builder.Property(e => e.Enabled).IsRequired();
builder.Property(e => e.PartitionKey).HasMaxLength(255).IsRequired(false);
builder.Property(e => e.NextRunAt).IsRequired();
builder.Property(e => e.LastEnqueueAt);
builder.Property(e => e.CreatedAt).IsRequired();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public class AtomizerScheduleEntity
/// <summary>Gets or sets whether this schedule is currently active.</summary>
public bool Enabled { get; set; } = true;

/// <summary>Gets or sets the partition key forwarded to generated jobs, if any.</summary>
public string? PartitionKey { get; set; }

/// <summary>Gets or sets the serialized retry intervals for generated jobs.</summary>
public TimeSpan[] RetryIntervals { get; set; } = [];

Expand Down Expand Up @@ -90,6 +93,7 @@ public static AtomizerScheduleEntity ToEntity(this AtomizerSchedule schedule)
MisfirePolicy = (MisfirePolicyEntity)(int)schedule.MisfirePolicy,
MaxCatchUp = schedule.MaxCatchUp,
Enabled = schedule.Enabled,
PartitionKey = schedule.PartitionKey?.Key,
RetryIntervals = schedule.RetryStrategy.RetryIntervals,
NextRunAt = schedule.NextRunAt,
LastEnqueueAt = schedule.LastEnqueueAt,
Expand Down Expand Up @@ -117,6 +121,7 @@ public static AtomizerSchedule ToAtomizerSchedule(this AtomizerScheduleEntity en
MisfirePolicy = (MisfirePolicy)(int)entity.MisfirePolicy,
MaxCatchUp = entity.MaxCatchUp,
Enabled = entity.Enabled,
PartitionKey = entity.PartitionKey is null ? null : new PartitionKey(entity.PartitionKey),
RetryStrategy =
entity.RetryIntervals.Length == 0 ? RetryStrategy.None : RetryStrategy.Intervals(entity.RetryIntervals),
NextRunAt = entity.NextRunAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal abstract class BaseSqlDialect : ISqlDialect
protected readonly string _sMisfirePolicy;
protected readonly string _sMaxCatchUp;
protected readonly string _sEnabled;
protected readonly string _sPartitionKey;
protected readonly string _sRetryIntervals;
protected readonly string _sNextRunAt;
protected readonly string _sLastEnqueueAt;
Expand Down Expand Up @@ -78,6 +79,7 @@ protected BaseSqlDialect(EntityMap jobs, EntityMap schedules)
_sMisfirePolicy = sc[nameof(AtomizerScheduleEntity.MisfirePolicy)];
_sMaxCatchUp = sc[nameof(AtomizerScheduleEntity.MaxCatchUp)];
_sEnabled = sc[nameof(AtomizerScheduleEntity.Enabled)];
_sPartitionKey = sc[nameof(AtomizerScheduleEntity.PartitionKey)];
_sRetryIntervals = sc[nameof(AtomizerScheduleEntity.RetryIntervals)];
_sNextRunAt = sc[nameof(AtomizerScheduleEntity.NextRunAt)];
_sLastEnqueueAt = sc[nameof(AtomizerScheduleEntity.LastEnqueueAt)];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date
{{_sMisfirePolicy}},
{{_sMaxCatchUp}},
{{_sEnabled}},
{{_sPartitionKey}},
{{_sRetryIntervals}},
{{_sNextRunAt}},
{{_sLastEnqueueAt}},
Expand All @@ -170,7 +171,8 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date
{11},
{12},
{13},
{14}
{14},
{15}
)
ON DUPLICATE KEY UPDATE
{{_sQueueKey}} = VALUES({{_sQueueKey}}),
Expand All @@ -181,9 +183,10 @@ ON DUPLICATE KEY UPDATE
{{_sMisfirePolicy}} = VALUES({{_sMisfirePolicy}}),
{{_sMaxCatchUp}} = VALUES({{_sMaxCatchUp}}),
{{_sEnabled}} = VALUES({{_sEnabled}}),
{{_sPartitionKey}} = VALUES({{_sPartitionKey}}),
{{_sRetryIntervals}} = VALUES({{_sRetryIntervals}}),
{{_sNextRunAt}} = VALUES({{_sNextRunAt}}),
{{_sUpdatedAt}} = {14};
{{_sUpdatedAt}} = {15};
""";
return FormattableStringFactory.Create(
format,
Expand All @@ -197,6 +200,7 @@ ON DUPLICATE KEY UPDATE
(int)entity.MisfirePolicy,
entity.MaxCatchUp,
entity.Enabled,
entity.PartitionKey,
retryIntervals,
entity.NextRunAt,
entity.LastEnqueueAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date
{{_sMisfirePolicy}},
{{_sMaxCatchUp}},
{{_sEnabled}},
{{_sPartitionKey}},
{{_sRetryIntervals}},
{{_sNextRunAt}},
{{_sLastEnqueueAt}},
Expand All @@ -169,7 +170,8 @@ public override FormattableString UpsertSchedule(AtomizerSchedule schedule, Date
{11},
{12},
{13},
{14}
{14},
{15}
)
ON CONFLICT ({{_sJobKey}}) DO UPDATE SET
{{_sQueueKey}} = EXCLUDED.{{_sQueueKey}},
Expand All @@ -180,6 +182,7 @@ ON CONFLICT ({{_sJobKey}}) DO UPDATE SET
{{_sMisfirePolicy}} = EXCLUDED.{{_sMisfirePolicy}},
{{_sMaxCatchUp}} = EXCLUDED.{{_sMaxCatchUp}},
{{_sEnabled}} = EXCLUDED.{{_sEnabled}},
{{_sPartitionKey}} = EXCLUDED.{{_sPartitionKey}},
{{_sRetryIntervals}} = EXCLUDED.{{_sRetryIntervals}},
{{_sNextRunAt}} = EXCLUDED.{{_sNextRunAt}},
{{_sUpdatedAt}} = EXCLUDED.{{_sUpdatedAt}};
Expand All @@ -196,6 +199,7 @@ ON CONFLICT ({{_sJobKey}}) DO UPDATE SET
(int)entity.MisfirePolicy,
entity.MaxCatchUp,
entity.Enabled,
entity.PartitionKey,
retryIntervals,
entity.NextRunAt,
entity.LastEnqueueAt,
Expand Down
28 changes: 16 additions & 12 deletions src/Atomizer.EntityFrameworkCore/Providers/Sql/SqlServerDialect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ WHEN MATCHED THEN UPDATE SET
{{_sMisfirePolicy}} = {6},
{{_sMaxCatchUp}} = {7},
{{_sEnabled}} = {8},
{{_sRetryIntervals}} = {9},
{{_sNextRunAt}} = {10},
{{_sUpdatedAt}} = {11}
{{_sPartitionKey}} = {9},
{{_sRetryIntervals}} = {10},
{{_sNextRunAt}} = {11},
{{_sUpdatedAt}} = {12}
WHEN NOT MATCHED THEN INSERT (
{{_sId}},
{{_sJobKey}},
Expand All @@ -160,13 +161,14 @@ WHEN NOT MATCHED THEN INSERT (
{{_sMisfirePolicy}},
{{_sMaxCatchUp}},
{{_sEnabled}},
{{_sPartitionKey}},
{{_sRetryIntervals}},
{{_sNextRunAt}},
{{_sLastEnqueueAt}},
{{_sCreatedAt}},
{{_sUpdatedAt}}
) VALUES (
{12},
{13},
{0},
{1},
{2},
Expand All @@ -178,9 +180,10 @@ WHEN NOT MATCHED THEN INSERT (
{8},
{9},
{10},
{13},
{11},
{14},
{11}
{15},
{12}
);
""";
return FormattableStringFactory.Create(
Expand All @@ -194,12 +197,13 @@ WHEN NOT MATCHED THEN INSERT (
(int)entity.MisfirePolicy, // {6}
entity.MaxCatchUp, // {7}
entity.Enabled ? 1 : 0, // {8}
retryIntervals, // {9}
entity.NextRunAt, // {10}
now, // {11}
entity.Id, // {12}
entity.LastEnqueueAt, // {13}
entity.CreatedAt // {14}
entity.PartitionKey, // {9}
retryIntervals, // {10}
entity.NextRunAt, // {11}
now, // {12}
entity.Id, // {13}
entity.LastEnqueueAt, // {14}
entity.CreatedAt // {15}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ public void UpsertSchedule_WhenCalled_ShouldContainOnDuplicateKeyUpdate()
var sql = dialect.UpsertSchedule(schedule, DateTimeOffset.UtcNow);

sql.Format.Should().Contain("ON DUPLICATE KEY UPDATE");
sql.Format.Should().Contain("PartitionKey");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ public void UpsertSchedule_WhenCalled_ShouldContainOnConflict()

sql.Format.Should().Contain("ON CONFLICT");
sql.Format.Should().Contain("DO UPDATE SET");
sql.Format.Should().Contain("PartitionKey");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,6 @@ public void UpsertSchedule_WhenCalled_ShouldContainMergeWithHoldlock()

sql.Format.Should().Contain("MERGE");
sql.Format.Should().Contain("WITH (HOLDLOCK)");
sql.Format.Should().Contain("PartitionKey");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ public async Task UpsertScheduleAsync_WhenScheduleDoesNotExist_ShouldInsertSched
"""{ "message": "New Schedule" }""",
Schedule.EveryMinute,
TimeZoneInfo.Utc,
now
now,
partitionKey: new PartitionKey("scheduled-partition")
);

await using var dbContext = _dbContextFactory();
Expand All @@ -583,9 +584,11 @@ public async Task UpsertScheduleAsync_WhenScheduleDoesNotExist_ShouldInsertSched
insertedScheduleEntity.Should().NotBeNull();
insertedScheduleEntity.JobKey.Should().Be(schedule.JobKey);
insertedScheduleEntity.Payload.Should().Be(schedule.Payload);
insertedScheduleEntity.PartitionKey.Should().Be("scheduled-partition");

var map = () => insertedScheduleEntity.ToAtomizerSchedule();
map.Should().NotThrow();
insertedScheduleEntity.ToAtomizerSchedule().PartitionKey.Should().Be(schedule.PartitionKey);
}

[Fact]
Expand Down Expand Up @@ -654,6 +657,7 @@ public async Task UpsertScheduleAsync_WhenScheduleExists_ShouldUpdateSchedule()

// Act
schedule.Payload = """{ "message": "Updated Schedule" }""";
schedule.PartitionKey = new PartitionKey("updated-partition");
var scheduleId = await storage.UpsertScheduleAsync(schedule, CancellationToken.None);
var updatedScheduleEntity = await dbContext
.Set<AtomizerScheduleEntity>()
Expand All @@ -663,9 +667,11 @@ public async Task UpsertScheduleAsync_WhenScheduleExists_ShouldUpdateSchedule()
scheduleId.Should().Be(schedule.Id);
updatedScheduleEntity.Should().NotBeNull();
updatedScheduleEntity.Payload.Should().Be("""{ "message": "Updated Schedule" }""");
updatedScheduleEntity.PartitionKey.Should().Be("updated-partition");

var map = () => updatedScheduleEntity.ToAtomizerSchedule();
map.Should().NotThrow();
updatedScheduleEntity.ToAtomizerSchedule().PartitionKey.Should().Be(schedule.PartitionKey);
}

[Fact]
Expand Down
Loading
Loading