Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,26 @@ Use `UsePostgres(postgres => postgres.DataSource = dataSource)` when an applicat

## Enqueue Jobs

Job methods return `Task` or `ValueTask` and receive the scheduler-owned `CancellationToken`. Use `Job.Context` when a handler needs durable logs, progress events, the job id, or the attempt number.
Job methods return `Task` or `ValueTask` and receive the scheduler-owned `CancellationToken`. Use constructor-injected `ILogger<T>` for durable job logs, `Job.Context` when a handler needs the job id or attempt number, and scheduler-supplied `IProgress<decimal>` for durable progress updates.

```csharp
public sealed class EmailJobs
using Microsoft.Extensions.Logging;

public sealed class EmailJobs(ILogger<EmailJobs> logger)
{
public async Task SendWelcomeAsync(
Guid userId,
IJobContext job,
IProgress<decimal> progress,
CancellationToken cancellationToken)
{
await job.LogAsync(JobLogLevel.Information, "Sending welcome email.", cancellationToken: cancellationToken);
logger.LogInformation("Sending welcome email for user {UserId}.", userId);
await SendEmailAsync(userId, cancellationToken);
await job.ReportProgressAsync(100, "Welcome email sent.", cancellationToken);
progress.Report(100);
}
}

var jobId = await enqueuer.EnqueueAsync<EmailJobs>(
(jobs, ct) => jobs.SendWelcomeAsync(userId, Job.Context, ct),
(jobs, ct, progress) => jobs.SendWelcomeAsync(userId, progress, ct),
new JobSubmission(
Priority: 10,
ConcurrencyGroupKeys: ["email"],
Expand All @@ -146,6 +148,17 @@ var jobId = await enqueuer.EnqueueAsync<EmailJobs>(
cancellationToken);
```

Sheddueller captures job logs by registering a Microsoft `ILoggerProvider`. If the application uses Serilog as the host logger and still wants Sheddueller's provider to receive log events, enable provider forwarding:

```csharp
builder.Host.UseSerilog(
(_, _, loggerConfiguration) => loggerConfiguration.WriteTo.Logger(Log.Logger),
preserveStaticLogger: true,
writeToProviders: true);
```

Because Sheddueller's capture is a Microsoft `ILoggerProvider`, Microsoft logging filters still apply. Configure `Logging`/`ILoggingBuilder` filters alongside any Serilog filtering rules if you want to control which job logs are stored by Sheddueller.

Use `NotBeforeUtc` for delayed jobs. Use `JobIdempotencyKind.MethodAndArguments` to reuse an existing queued job with the same target method and serialized arguments.

## Recurring Schedules
Expand All @@ -156,7 +169,7 @@ Recurring schedules are keyed definitions. Calling `CreateOrUpdateAsync` at star
await schedules.CreateOrUpdateAsync<EmailJobs>(
"email:daily-digest",
"0 2 * * *",
(jobs, ct) => jobs.SendDailyDigestAsync(Job.Context, ct),
(jobs, ct, progress) => jobs.SendDailyDigestAsync(progress, ct),
new RecurringScheduleOptions(
Priority: 5,
ConcurrencyGroupKeys: ["email"],
Expand Down Expand Up @@ -191,7 +204,7 @@ var capture = provider.GetRequiredService<CapturingJobEnqueuer>().Capture();
await subject.DoSomethingThatEnqueuesAsync();

var matches = await capture.Fake.MatchAsync<EmailJobs>(
(jobs, ct) => jobs.SendWelcomeAsync(userId, Job.Context, ct));
(jobs, ct, progress) => jobs.SendWelcomeAsync(userId, progress, ct));
```

The same package includes `FakeJobEnqueuer`, `FakeRecurringScheduleManager`, and async-context-aware capture services for dependency-injected tests.
Expand Down
119 changes: 94 additions & 25 deletions samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ namespace Sheddueller.SampleHost.DemoJobs;

using System.Globalization;

public sealed class DemoJobService(DemoJobState state)
using Microsoft.Extensions.Logging;

public sealed class DemoJobService(
DemoJobState state,
ILogger<DemoJobService> logger)
{
private readonly DemoJobState _state = state;
private readonly ILogger<DemoJobService> _logger = logger;

public Task RunQuickAsync(string label, CancellationToken cancellationToken)
{
Expand All @@ -13,14 +18,16 @@ public Task RunQuickAsync(string label, CancellationToken cancellationToken)
return Task.CompletedTask;
}

public async Task RunProgressAsync(string label, IJobContext jobContext, CancellationToken cancellationToken)
public async Task RunProgressAsync(
string label,
IProgress<decimal> progress,
CancellationToken cancellationToken)
{
for (var step = 0; step <= 4; step++)
{
var percent = step * 25;
var message = $"{label} step {step + 1}/5";
await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false);
await jobContext.ReportProgressAsync(percent, message, cancellationToken).ConfigureAwait(false);
ProgressStep(this._logger, label, step + 1, null);
progress.Report(percent);

if (step < 4)
{
Expand All @@ -29,61 +36,123 @@ public async Task RunProgressAsync(string label, IJobContext jobContext, Cancell
}
}

public async Task RunRetryUntilSuccessAsync(
public Task RunRetryUntilSuccessAsync(
string runKey,
int failuresBeforeSuccess,
IJobContext jobContext,
IProgress<decimal> progress,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var attempt = this._state.IncrementAttemptCount(runKey);
var message = $"retry demo {runKey} attempt {attempt}";
await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false);
RetryAttempt(this._logger, runKey, attempt, null);

if (attempt <= failuresBeforeSuccess)
{
throw new InvalidOperationException($"{message} failed on purpose.");
}

await jobContext.ReportProgressAsync(100, $"{message} succeeded", cancellationToken).ConfigureAwait(false);
progress.Report(100);
return Task.CompletedTask;
}

public async Task RunAlwaysFailAsync(string label, IJobContext jobContext, CancellationToken cancellationToken)
public Task RunAlwaysFailAsync(string label, CancellationToken cancellationToken)
{
await jobContext.LogAsync(JobLogLevel.Error, $"{label} is about to fail permanently.", cancellationToken: cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
PermanentFailure(this._logger, label, null);
throw new InvalidOperationException($"{label} failed on purpose.");
}

public async Task RunGroupHoldAsync(string label, IJobContext jobContext, CancellationToken cancellationToken)
public async Task RunGroupHoldAsync(
string label,
IProgress<decimal> progress,
CancellationToken cancellationToken)
{
await jobContext.LogAsync(JobLogLevel.Information, $"{label} claimed the demo concurrency slot.", cancellationToken: cancellationToken).ConfigureAwait(false);
GroupSlotClaimed(this._logger, label, null);

for (var step = 1; step <= 6; step++)
{
var percent = step * (100d / 6d);
await jobContext.ReportProgressAsync(percent, $"{label} holding slot {step}/6", cancellationToken).ConfigureAwait(false);
var percent = step * (100m / 6m);
GroupSlotHeld(this._logger, label, step, null);
progress.Report(percent);
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken).ConfigureAwait(false);
}
}

public async Task RunIdempotentDemoAsync(string label, IJobContext jobContext, CancellationToken cancellationToken)
public async Task RunIdempotentDemoAsync(
string label,
IProgress<decimal> progress,
CancellationToken cancellationToken)
{
await jobContext.LogAsync(JobLogLevel.Information, $"{label} started its 10-second idempotency demo run.", cancellationToken: cancellationToken)
.ConfigureAwait(false);
IdempotentRunStarted(this._logger, label, null);

for (var step = 1; step <= 10; step++)
{
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false);
var percent = step * 10;
await jobContext.ReportProgressAsync(percent, $"{label} idempotent run {step}/10", cancellationToken).ConfigureAwait(false);
IdempotentRunStep(this._logger, label, step, null);
progress.Report(percent);
}
}

public async Task RunRecurringAsync(IJobContext jobContext, CancellationToken cancellationToken)
public Task RunRecurringAsync(
IProgress<decimal> progress,
CancellationToken cancellationToken)
{
var message = string.Create(
CultureInfo.InvariantCulture,
$"Recurring demo fired at {DateTimeOffset.UtcNow:O}");
await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false);
await jobContext.ReportProgressAsync(100, "Recurring occurrence completed", cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();

var firedAtUtc = DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
RecurringFired(this._logger, firedAtUtc, null);
progress.Report(100);
return Task.CompletedTask;
}

private static readonly Action<ILogger, string, int, Exception?> ProgressStep =
LoggerMessage.Define<string, int>(
LogLevel.Information,
new EventId(100, nameof(ProgressStep)),
"{Label} step {Step}/5");

private static readonly Action<ILogger, string, int, Exception?> RetryAttempt =
LoggerMessage.Define<string, int>(
LogLevel.Information,
new EventId(101, nameof(RetryAttempt)),
"retry demo {RunKey} attempt {Attempt}");

private static readonly Action<ILogger, string, Exception?> PermanentFailure =
LoggerMessage.Define<string>(
LogLevel.Error,
new EventId(102, nameof(PermanentFailure)),
"{Label} is about to fail permanently.");

private static readonly Action<ILogger, string, Exception?> GroupSlotClaimed =
LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(103, nameof(GroupSlotClaimed)),
"{Label} claimed the demo concurrency slot.");

private static readonly Action<ILogger, string, int, Exception?> GroupSlotHeld =
LoggerMessage.Define<string, int>(
LogLevel.Information,
new EventId(104, nameof(GroupSlotHeld)),
"{Label} holding slot {Step}/6");

private static readonly Action<ILogger, string, Exception?> IdempotentRunStarted =
LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(105, nameof(IdempotentRunStarted)),
"{Label} started its 10-second idempotency demo run.");

private static readonly Action<ILogger, string, int, Exception?> IdempotentRunStep =
LoggerMessage.Define<string, int>(
LogLevel.Information,
new EventId(106, nameof(IdempotentRunStep)),
"{Label} idempotent run {Step}/10");

private static readonly Action<ILogger, string, Exception?> RecurringFired =
LoggerMessage.Define<string>(
LogLevel.Information,
new EventId(107, nameof(RecurringFired)),
"Recurring demo fired at {FiredAtUtc}");
}
2 changes: 1 addition & 1 deletion samples/Sheddueller.SampleHost/LauncherPageRenderer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static string Render(string? statusMessage)
builder.AppendLine(" </div>");
builder.AppendLine(" <div class=\"card-grid\">");
AppendActionCard(builder, "/launch/quick-success", "Quick success", "Immediate completion for happy-path rows.", "Enqueue job");
AppendActionCard(builder, "/launch/progress", "Progress + logs", "Emits durable logs and progress snapshots over a few seconds.", "Enqueue job");
AppendActionCard(builder, "/launch/progress", "Progress + logs", "Emits captured ILogger logs and progress snapshots over a few seconds.", "Enqueue job");
AppendActionCard(builder, "/launch/retry-then-succeed", "Retry then succeed", "Fails twice, then succeeds so retry history is visible.", "Enqueue job");
AppendActionCard(builder, "/launch/permanent-failure", "Permanent failure", "Fails terminally without retries.", "Enqueue job");
AppendActionCard(builder, "/launch/delayed", "Delayed job", "Queues a short delayed job to exercise delayed state and not-before time.", "Enqueue job");
Expand Down
16 changes: 8 additions & 8 deletions samples/Sheddueller.SampleHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
app.MapPost("/launch/progress", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) =>
{
var jobId = await enqueuer.EnqueueAsync<DemoJobService>(
(service, ct) => service.RunProgressAsync("progress-demo", Job.Context, ct),
(service, ct, progress) => service.RunProgressAsync("progress-demo", progress, ct),
cancellationToken: cancellationToken).ConfigureAwait(false);
return RedirectWithMessage($"Queued progress demo job {jobId:D}.");
});
Expand All @@ -71,7 +71,7 @@
{
var runKey = $"retry-demo:{Guid.NewGuid():N}";
var jobId = await enqueuer.EnqueueAsync<DemoJobService>(
(service, ct) => service.RunRetryUntilSuccessAsync(runKey, 2, Job.Context, ct),
(service, ct, progress) => service.RunRetryUntilSuccessAsync(runKey, 2, progress, ct),
new JobSubmission(RetryPolicy: new RetryPolicy(4, RetryBackoffKind.Fixed, TimeSpan.FromSeconds(2))),
cancellationToken).ConfigureAwait(false);
return RedirectWithMessage($"Queued retry demo job {jobId:D}. It will fail twice before succeeding.");
Expand All @@ -80,7 +80,7 @@
app.MapPost("/launch/permanent-failure", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) =>
{
var jobId = await enqueuer.EnqueueAsync<DemoJobService>(
(service, ct) => service.RunAlwaysFailAsync("permanent-failure", Job.Context, ct),
(service, ct) => service.RunAlwaysFailAsync("permanent-failure", ct),
new JobSubmission(RetryPolicy: new RetryPolicy(1, RetryBackoffKind.Fixed, TimeSpan.FromSeconds(1))),
cancellationToken).ConfigureAwait(false);
return RedirectWithMessage($"Queued permanent failure job {jobId:D}.");
Expand All @@ -99,7 +99,7 @@
app.MapPost("/launch/many-tags", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) =>
{
var jobId = await enqueuer.EnqueueAsync<DemoJobService>(
(service, ct) => service.RunProgressAsync("many-tags-demo", Job.Context, ct),
(service, ct, progress) => service.RunProgressAsync("many-tags-demo", progress, ct),
new JobSubmission(
Priority: 15,
Tags:
Expand Down Expand Up @@ -129,7 +129,7 @@
{
var label = $"blocking-{index}";
var jobId = await enqueuer.EnqueueAsync<DemoJobService>(
(service, ct) => service.RunGroupHoldAsync(label, Job.Context, ct),
(service, ct, progress) => service.RunGroupHoldAsync(label, progress, ct),
new JobSubmission(Priority: 25, ConcurrencyGroupKeys: [GroupKey]),
cancellationToken).ConfigureAwait(false);
jobIds.Add(jobId);
Expand All @@ -152,7 +152,7 @@
if (!await HasNonTerminalJobsInGroupAsync(inspectionReader, GroupKey, cancellationToken).ConfigureAwait(false))
{
_ = await enqueuer.EnqueueAsync<DemoJobService>(
(service, ct) => service.RunIdempotentDemoAsync("idempotent-demo-slot-holder", Job.Context, ct),
(service, ct, progress) => service.RunIdempotentDemoAsync("idempotent-demo-slot-holder", progress, ct),
new JobSubmission(
Priority: 50,
ConcurrencyGroupKeys: [GroupKey],
Expand All @@ -161,7 +161,7 @@
}

var jobId = await enqueuer.EnqueueAsync<DemoJobService>(
(service, ct) => service.RunIdempotentDemoAsync(WorkLabel, Job.Context, ct),
(service, ct, progress) => service.RunIdempotentDemoAsync(WorkLabel, progress, ct),
new JobSubmission(
Priority: 25,
ConcurrencyGroupKeys: [GroupKey],
Expand All @@ -177,7 +177,7 @@
var result = await scheduleManager.CreateOrUpdateAsync<DemoJobService>(
"demo:recurring",
"* * * * *",
(service, ct) => service.RunRecurringAsync(Job.Context, ct),
(service, ct, progress) => service.RunRecurringAsync(progress, ct),
new RecurringScheduleOptions(Priority: 10, OverlapMode: RecurringOverlapMode.Skip),
cancellationToken).ConfigureAwait(false);
return RedirectWithMessage($"Recurring schedule 'demo:recurring' is {result}. The next occurrence will fire on the next minute boundary.");
Expand Down
2 changes: 1 addition & 1 deletion samples/Sheddueller.SampleHost/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ The sample applies PostgreSQL schema migrations automatically on startup and reg
## Launcher Scenarios

- `Quick success`: immediate completion
- `Progress + logs`: emits durable logs and progress updates
- `Progress + logs`: emits `ILogger<T>` messages captured as durable job logs, plus `IProgress<decimal>` progress updates
- `Retry then succeed`: fails twice, then succeeds
- `Permanent failure`: terminal failure without retries
- `Delayed job`: waits 30 seconds before becoming claimable
Expand Down
28 changes: 28 additions & 0 deletions src/Sheddueller.Testing/CapturingJobEnqueuer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ public ValueTask<Guid> EnqueueAsync(
CancellationToken cancellationToken = default)
=> this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken);

/// <inheritdoc />
public ValueTask<Guid> EnqueueAsync(
Expression<Func<CancellationToken, IProgress<decimal>, Task>> work,
JobSubmission? submission = null,
CancellationToken cancellationToken = default)
=> this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken);

/// <inheritdoc />
public ValueTask<Guid> EnqueueAsync(
Expression<Func<CancellationToken, IProgress<decimal>, ValueTask>> work,
JobSubmission? submission = null,
CancellationToken cancellationToken = default)
=> this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken);

/// <inheritdoc />
public ValueTask<Guid> EnqueueAsync<TService>(
Expression<Func<TService, CancellationToken, Task>> work,
Expand All @@ -56,6 +70,20 @@ public ValueTask<Guid> EnqueueAsync<TService>(
CancellationToken cancellationToken = default)
=> this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken);

/// <inheritdoc />
public ValueTask<Guid> EnqueueAsync<TService>(
Expression<Func<TService, CancellationToken, IProgress<decimal>, Task>> work,
JobSubmission? submission = null,
CancellationToken cancellationToken = default)
=> this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken);

/// <inheritdoc />
public ValueTask<Guid> EnqueueAsync<TService>(
Expression<Func<TService, CancellationToken, IProgress<decimal>, ValueTask>> work,
JobSubmission? submission = null,
CancellationToken cancellationToken = default)
=> this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken);

/// <inheritdoc />
public ValueTask<IReadOnlyList<Guid>> EnqueueManyAsync(
IReadOnlyList<JobEnqueueItem> jobs,
Expand Down
Loading