From e7134482969008ee8f37cbe5f30431435c9af3f1 Mon Sep 17 00:00:00 2001 From: Jack Bird Date: Mon, 27 Apr 2026 14:03:51 +0100 Subject: [PATCH 1/2] feat: add in IProgress reporting limited to decimal --- README.md | 11 +- .../DemoJobs/DemoJobService.cs | 38 ++- samples/Sheddueller.SampleHost/Program.cs | 14 +- samples/Sheddueller.SampleHost/README.md | 2 +- .../CapturingJobEnqueuer.cs | 28 ++ .../CapturingRecurringScheduleManager.cs | 36 +++ src/Sheddueller.Testing/FakeJobEnqueuer.cs | 100 ++++++ .../FakeRecurringScheduleManager.cs | 72 +++++ .../Internal/ShedduellerWorker.cs | 49 +++ src/Sheddueller/Enqueueing/JobEnqueuer.cs | 24 ++ .../Enqueueing/JobExpressionParser.cs | 122 +++++++- src/Sheddueller/IJobContext.cs | 8 - src/Sheddueller/IJobEnqueuer.cs | 50 +++ src/Sheddueller/IRecurringScheduleManager.cs | 66 ++++ .../Jobs/JobInvocationDisplayFormatter.cs | 1 + src/Sheddueller/JobEnqueueItem.cs | 62 ++++ src/Sheddueller/Runtime/JobContext.cs | 21 -- .../Runtime/RecurringScheduleManager.cs | 72 +++++ .../Storage/JobMethodParameterBindingKind.cs | 5 + .../JobMethodParameterBindingResolver.cs | 5 +- .../DashboardEndpointTests.cs | 32 ++ .../FakeJobEnqueuerTests.cs | 27 ++ .../FakeRecurringScheduleManagerTests.cs | 30 +- test/Sheddueller.Tests/JobEnqueuerTests.cs | 114 ++++++- .../JobInvocationDisplayFormatterTests.cs | 16 + .../WorkerProgressTests.cs | 286 ++++++++++++++++++ 26 files changed, 1222 insertions(+), 69 deletions(-) create mode 100644 test/Sheddueller.Worker.Tests/WorkerProgressTests.cs diff --git a/README.md b/README.md index 8455f2d..36e2088 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,7 @@ Use `UsePostgres(postgres => postgres.DataSource = dataSource)` when an applicat ## Enqueue Jobs -Job methods return `Task` or `ValueTask` and receive the scheduler-owned `CancellationToken`. Use `Job.Context` when a handler needs durable logs, progress events, the job id, or the attempt number. +Job methods return `Task` or `ValueTask` and receive the scheduler-owned `CancellationToken`. Use `Job.Context` when a handler needs durable logs, the job id, or the attempt number. Use scheduler-supplied `IProgress` for durable progress updates. ```csharp public sealed class EmailJobs @@ -128,16 +128,17 @@ public sealed class EmailJobs public async Task SendWelcomeAsync( Guid userId, IJobContext job, + IProgress progress, CancellationToken cancellationToken) { await job.LogAsync(JobLogLevel.Information, "Sending welcome email.", cancellationToken: cancellationToken); await SendEmailAsync(userId, cancellationToken); - await job.ReportProgressAsync(100, "Welcome email sent.", cancellationToken); + progress.Report(100); } } var jobId = await enqueuer.EnqueueAsync( - (jobs, ct) => jobs.SendWelcomeAsync(userId, Job.Context, ct), + (jobs, ct, progress) => jobs.SendWelcomeAsync(userId, Job.Context, progress, ct), new JobSubmission( Priority: 10, ConcurrencyGroupKeys: ["email"], @@ -156,7 +157,7 @@ Recurring schedules are keyed definitions. Calling `CreateOrUpdateAsync` at star await schedules.CreateOrUpdateAsync( "email:daily-digest", "0 2 * * *", - (jobs, ct) => jobs.SendDailyDigestAsync(Job.Context, ct), + (jobs, ct, progress) => jobs.SendDailyDigestAsync(Job.Context, progress, ct), new RecurringScheduleOptions( Priority: 5, ConcurrencyGroupKeys: ["email"], @@ -191,7 +192,7 @@ var capture = provider.GetRequiredService().Capture(); await subject.DoSomethingThatEnqueuesAsync(); var matches = await capture.Fake.MatchAsync( - (jobs, ct) => jobs.SendWelcomeAsync(userId, Job.Context, ct)); + (jobs, ct, progress) => jobs.SendWelcomeAsync(userId, Job.Context, progress, ct)); ``` The same package includes `FakeJobEnqueuer`, `FakeRecurringScheduleManager`, and async-context-aware capture services for dependency-injected tests. diff --git a/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs b/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs index 1972b06..8abbb8a 100644 --- a/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs +++ b/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs @@ -13,14 +13,18 @@ public Task RunQuickAsync(string label, CancellationToken cancellationToken) return Task.CompletedTask; } - public async Task RunProgressAsync(string label, IJobContext jobContext, CancellationToken cancellationToken) + public async Task RunProgressAsync( + string label, + IJobContext jobContext, + IProgress progress, + CancellationToken cancellationToken) { for (var step = 0; step <= 4; step++) { var percent = step * 25; var message = $"{label} step {step + 1}/5"; await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false); - await jobContext.ReportProgressAsync(percent, message, cancellationToken).ConfigureAwait(false); + progress.Report(percent); if (step < 4) { @@ -33,6 +37,7 @@ public async Task RunRetryUntilSuccessAsync( string runKey, int failuresBeforeSuccess, IJobContext jobContext, + IProgress progress, CancellationToken cancellationToken) { var attempt = this._state.IncrementAttemptCount(runKey); @@ -44,7 +49,7 @@ public async Task RunRetryUntilSuccessAsync( throw new InvalidOperationException($"{message} failed on purpose."); } - await jobContext.ReportProgressAsync(100, $"{message} succeeded", cancellationToken).ConfigureAwait(false); + progress.Report(100); } public async Task RunAlwaysFailAsync(string label, IJobContext jobContext, CancellationToken cancellationToken) @@ -53,19 +58,28 @@ public async Task RunAlwaysFailAsync(string label, IJobContext jobContext, Cance throw new InvalidOperationException($"{label} failed on purpose."); } - public async Task RunGroupHoldAsync(string label, IJobContext jobContext, CancellationToken cancellationToken) + public async Task RunGroupHoldAsync( + string label, + IJobContext jobContext, + IProgress progress, + CancellationToken cancellationToken) { await jobContext.LogAsync(JobLogLevel.Information, $"{label} claimed the demo concurrency slot.", cancellationToken: cancellationToken).ConfigureAwait(false); for (var step = 1; step <= 6; step++) { - var percent = step * (100d / 6d); - await jobContext.ReportProgressAsync(percent, $"{label} holding slot {step}/6", cancellationToken).ConfigureAwait(false); + var percent = step * (100m / 6m); + await jobContext.LogAsync(JobLogLevel.Information, $"{label} holding slot {step}/6", cancellationToken: cancellationToken).ConfigureAwait(false); + progress.Report(percent); await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken).ConfigureAwait(false); } } - public async Task RunIdempotentDemoAsync(string label, IJobContext jobContext, CancellationToken cancellationToken) + public async Task RunIdempotentDemoAsync( + string label, + IJobContext jobContext, + IProgress progress, + CancellationToken cancellationToken) { await jobContext.LogAsync(JobLogLevel.Information, $"{label} started its 10-second idempotency demo run.", cancellationToken: cancellationToken) .ConfigureAwait(false); @@ -74,16 +88,20 @@ await jobContext.LogAsync(JobLogLevel.Information, $"{label} started its 10-seco { await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); var percent = step * 10; - await jobContext.ReportProgressAsync(percent, $"{label} idempotent run {step}/10", cancellationToken).ConfigureAwait(false); + await jobContext.LogAsync(JobLogLevel.Information, $"{label} idempotent run {step}/10", cancellationToken: cancellationToken).ConfigureAwait(false); + progress.Report(percent); } } - public async Task RunRecurringAsync(IJobContext jobContext, CancellationToken cancellationToken) + public async Task RunRecurringAsync( + IJobContext jobContext, + IProgress progress, + CancellationToken cancellationToken) { var message = string.Create( CultureInfo.InvariantCulture, $"Recurring demo fired at {DateTimeOffset.UtcNow:O}"); await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false); - await jobContext.ReportProgressAsync(100, "Recurring occurrence completed", cancellationToken).ConfigureAwait(false); + progress.Report(100); } } diff --git a/samples/Sheddueller.SampleHost/Program.cs b/samples/Sheddueller.SampleHost/Program.cs index 01da2ab..4645617 100644 --- a/samples/Sheddueller.SampleHost/Program.cs +++ b/samples/Sheddueller.SampleHost/Program.cs @@ -62,7 +62,7 @@ app.MapPost("/launch/progress", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) => { var jobId = await enqueuer.EnqueueAsync( - (service, ct) => service.RunProgressAsync("progress-demo", Job.Context, ct), + (service, ct, progress) => service.RunProgressAsync("progress-demo", Job.Context, progress, ct), cancellationToken: cancellationToken).ConfigureAwait(false); return RedirectWithMessage($"Queued progress demo job {jobId:D}."); }); @@ -71,7 +71,7 @@ { var runKey = $"retry-demo:{Guid.NewGuid():N}"; var jobId = await enqueuer.EnqueueAsync( - (service, ct) => service.RunRetryUntilSuccessAsync(runKey, 2, Job.Context, ct), + (service, ct, progress) => service.RunRetryUntilSuccessAsync(runKey, 2, Job.Context, progress, ct), new JobSubmission(RetryPolicy: new RetryPolicy(4, RetryBackoffKind.Fixed, TimeSpan.FromSeconds(2))), cancellationToken).ConfigureAwait(false); return RedirectWithMessage($"Queued retry demo job {jobId:D}. It will fail twice before succeeding."); @@ -99,7 +99,7 @@ app.MapPost("/launch/many-tags", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) => { var jobId = await enqueuer.EnqueueAsync( - (service, ct) => service.RunProgressAsync("many-tags-demo", Job.Context, ct), + (service, ct, progress) => service.RunProgressAsync("many-tags-demo", Job.Context, progress, ct), new JobSubmission( Priority: 15, Tags: @@ -129,7 +129,7 @@ { var label = $"blocking-{index}"; var jobId = await enqueuer.EnqueueAsync( - (service, ct) => service.RunGroupHoldAsync(label, Job.Context, ct), + (service, ct, progress) => service.RunGroupHoldAsync(label, Job.Context, progress, ct), new JobSubmission(Priority: 25, ConcurrencyGroupKeys: [GroupKey]), cancellationToken).ConfigureAwait(false); jobIds.Add(jobId); @@ -152,7 +152,7 @@ if (!await HasNonTerminalJobsInGroupAsync(inspectionReader, GroupKey, cancellationToken).ConfigureAwait(false)) { _ = await enqueuer.EnqueueAsync( - (service, ct) => service.RunIdempotentDemoAsync("idempotent-demo-slot-holder", Job.Context, ct), + (service, ct, progress) => service.RunIdempotentDemoAsync("idempotent-demo-slot-holder", Job.Context, progress, ct), new JobSubmission( Priority: 50, ConcurrencyGroupKeys: [GroupKey], @@ -161,7 +161,7 @@ } var jobId = await enqueuer.EnqueueAsync( - (service, ct) => service.RunIdempotentDemoAsync(WorkLabel, Job.Context, ct), + (service, ct, progress) => service.RunIdempotentDemoAsync(WorkLabel, Job.Context, progress, ct), new JobSubmission( Priority: 25, ConcurrencyGroupKeys: [GroupKey], @@ -177,7 +177,7 @@ var result = await scheduleManager.CreateOrUpdateAsync( "demo:recurring", "* * * * *", - (service, ct) => service.RunRecurringAsync(Job.Context, ct), + (service, ct, progress) => service.RunRecurringAsync(Job.Context, progress, ct), new RecurringScheduleOptions(Priority: 10, OverlapMode: RecurringOverlapMode.Skip), cancellationToken).ConfigureAwait(false); return RedirectWithMessage($"Recurring schedule 'demo:recurring' is {result}. The next occurrence will fire on the next minute boundary."); diff --git a/samples/Sheddueller.SampleHost/README.md b/samples/Sheddueller.SampleHost/README.md index 55ad326..27a8807 100644 --- a/samples/Sheddueller.SampleHost/README.md +++ b/samples/Sheddueller.SampleHost/README.md @@ -47,7 +47,7 @@ The sample applies PostgreSQL schema migrations automatically on startup and reg ## Launcher Scenarios - `Quick success`: immediate completion -- `Progress + logs`: emits durable logs and progress updates +- `Progress + logs`: emits durable logs and `IProgress` progress updates - `Retry then succeed`: fails twice, then succeeds - `Permanent failure`: terminal failure without retries - `Delayed job`: waits 30 seconds before becoming claimable diff --git a/src/Sheddueller.Testing/CapturingJobEnqueuer.cs b/src/Sheddueller.Testing/CapturingJobEnqueuer.cs index 1af5821..a1a83ec 100644 --- a/src/Sheddueller.Testing/CapturingJobEnqueuer.cs +++ b/src/Sheddueller.Testing/CapturingJobEnqueuer.cs @@ -42,6 +42,20 @@ public ValueTask EnqueueAsync( CancellationToken cancellationToken = default) => this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken); + /// + public ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken); + + /// + public ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken); + /// public ValueTask EnqueueAsync( Expression> work, @@ -56,6 +70,20 @@ public ValueTask EnqueueAsync( CancellationToken cancellationToken = default) => this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken); + /// + public ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken); + + /// + public ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().EnqueueAsync(work, submission, cancellationToken); + /// public ValueTask> EnqueueManyAsync( IReadOnlyList jobs, diff --git a/src/Sheddueller.Testing/CapturingRecurringScheduleManager.cs b/src/Sheddueller.Testing/CapturingRecurringScheduleManager.cs index 70b77c5..8118c6d 100644 --- a/src/Sheddueller.Testing/CapturingRecurringScheduleManager.cs +++ b/src/Sheddueller.Testing/CapturingRecurringScheduleManager.cs @@ -49,6 +49,24 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => this.CurrentOrDiscardingFake().CreateOrUpdateAsync(scheduleKey, cronExpression, work, options, cancellationToken); + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().CreateOrUpdateAsync(scheduleKey, cronExpression, work, options, cancellationToken); + + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().CreateOrUpdateAsync(scheduleKey, cronExpression, work, options, cancellationToken); + /// public ValueTask CreateOrUpdateAsync( string scheduleKey, @@ -67,6 +85,24 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => this.CurrentOrDiscardingFake().CreateOrUpdateAsync(scheduleKey, cronExpression, work, options, cancellationToken); + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().CreateOrUpdateAsync(scheduleKey, cronExpression, work, options, cancellationToken); + + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CurrentOrDiscardingFake().CreateOrUpdateAsync(scheduleKey, cronExpression, work, options, cancellationToken); + /// public ValueTask TriggerAsync( string scheduleKey, diff --git a/src/Sheddueller.Testing/FakeJobEnqueuer.cs b/src/Sheddueller.Testing/FakeJobEnqueuer.cs index 5060d3a..3a7b329 100644 --- a/src/Sheddueller.Testing/FakeJobEnqueuer.cs +++ b/src/Sheddueller.Testing/FakeJobEnqueuer.cs @@ -82,6 +82,40 @@ public async ValueTask EnqueueAsync( } } + /// + public async ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + { + var preparedJob = await this.PrepareJobAsync(JobExpressionParser.Parse(work), submission, cancellationToken).ConfigureAwait(false); + + lock (this._syncRoot) + { + var recordedJob = this.CreateRecordedJob(preparedJob, batchId: null, batchIndex: null); + this._jobs.Add(recordedJob); + + return recordedJob.JobId; + } + } + + /// + public async ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + { + var preparedJob = await this.PrepareJobAsync(JobExpressionParser.Parse(work), submission, cancellationToken).ConfigureAwait(false); + + lock (this._syncRoot) + { + var recordedJob = this.CreateRecordedJob(preparedJob, batchId: null, batchIndex: null); + this._jobs.Add(recordedJob); + + return recordedJob.JobId; + } + } + /// public async ValueTask EnqueueAsync( Expression> work, @@ -116,6 +150,40 @@ public async ValueTask EnqueueAsync( } } + /// + public async ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + { + var preparedJob = await this.PrepareJobAsync(JobExpressionParser.Parse(work), submission, cancellationToken).ConfigureAwait(false); + + lock (this._syncRoot) + { + var recordedJob = this.CreateRecordedJob(preparedJob, batchId: null, batchIndex: null); + this._jobs.Add(recordedJob); + + return recordedJob.JobId; + } + } + + /// + public async ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + { + var preparedJob = await this.PrepareJobAsync(JobExpressionParser.Parse(work), submission, cancellationToken).ConfigureAwait(false); + + lock (this._syncRoot) + { + var recordedJob = this.CreateRecordedJob(preparedJob, batchId: null, batchIndex: null); + this._jobs.Add(recordedJob); + + return recordedJob.JobId; + } + } + /// public async ValueTask> EnqueueManyAsync( IReadOnlyList jobs, @@ -174,6 +242,22 @@ public async ValueTask MatchAsync( CancellationToken cancellationToken = default) => await this.MatchCoreAsync(JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// + /// Finds recorded jobs that match a Task-returning job method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + Expression, Task>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + + /// + /// Finds recorded jobs that match a ValueTask-returning job method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + Expression, ValueTask>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// /// Finds recorded jobs that match a Task-returning service method call. /// @@ -190,6 +274,22 @@ public async ValueTask MatchAsync( CancellationToken cancellationToken = default) => await this.MatchCoreAsync(JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// + /// Finds recorded jobs that match a Task-returning service method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + Expression, Task>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + + /// + /// Finds recorded jobs that match a ValueTask-returning service method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + Expression, ValueTask>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// /// Removes all recorded jobs and resets enqueue sequence numbering. /// diff --git a/src/Sheddueller.Testing/FakeRecurringScheduleManager.cs b/src/Sheddueller.Testing/FakeRecurringScheduleManager.cs index 1acf85e..aea77b0 100644 --- a/src/Sheddueller.Testing/FakeRecurringScheduleManager.cs +++ b/src/Sheddueller.Testing/FakeRecurringScheduleManager.cs @@ -98,6 +98,24 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, JobExpressionParser.Parse(work), options, cancellationToken); + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, JobExpressionParser.Parse(work), options, cancellationToken); + + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, JobExpressionParser.Parse(work), options, cancellationToken); + /// public ValueTask CreateOrUpdateAsync( string scheduleKey, @@ -107,6 +125,24 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, JobExpressionParser.Parse(work), options, cancellationToken); + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, JobExpressionParser.Parse(work), options, cancellationToken); + + /// + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, JobExpressionParser.Parse(work), options, cancellationToken); + /// public ValueTask CreateOrUpdateAsync( string scheduleKey, @@ -252,6 +288,24 @@ public async ValueTask MatchAsync( CancellationToken cancellationToken = default) => await this.MatchCoreAsync(scheduleKey, JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// + /// Finds recorded recurring schedules that match a Task-returning job method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + string scheduleKey, + Expression, Task>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(scheduleKey, JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + + /// + /// Finds recorded recurring schedules that match a ValueTask-returning job method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + string scheduleKey, + Expression, ValueTask>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(scheduleKey, JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// /// Finds recorded recurring schedules that match a Task-returning service method call. /// @@ -270,6 +324,24 @@ public async ValueTask MatchAsync( CancellationToken cancellationToken = default) => await this.MatchCoreAsync(scheduleKey, JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// + /// Finds recorded recurring schedules that match a Task-returning service method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + string scheduleKey, + Expression, Task>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(scheduleKey, JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + + /// + /// Finds recorded recurring schedules that match a ValueTask-returning service method call with scheduler-supplied progress reporting. + /// + public async ValueTask MatchAsync( + string scheduleKey, + Expression, ValueTask>> work, + CancellationToken cancellationToken = default) + => await this.MatchCoreAsync(scheduleKey, JobExpressionParser.Parse(work), cancellationToken).ConfigureAwait(false); + /// /// Removes all recorded recurring schedules. /// diff --git a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs index 3eaf1f6..c03127c 100644 --- a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs +++ b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs @@ -203,12 +203,18 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken .GetRequiredService() .DeserializeAsync(job.SerializedArguments, serializableParameterTypes, executionToken) .ConfigureAwait(false); + var progressReporter = new DecimalJobProgressReporter( + job.JobId, + job.AttemptCount, + this._jobEventSink, + this._jobContextLogger); var invocationArguments = BuildInvocationArguments( scope.ServiceProvider, methodParameterTypes, parameterBindings, deserializedArguments, jobContext, + progressReporter, executionToken); object? result; @@ -242,6 +248,7 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken IReadOnlyList parameterBindings, IReadOnlyList deserializedArguments, IJobContext jobContext, + DecimalJobProgressReporter progressReporter, CancellationToken executionToken) { if (methodParameterTypes.Length != parameterBindings.Count) @@ -264,6 +271,10 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken invocationArguments[i] = jobContext; continue; + case JobMethodParameterBindingKind.ProgressReporter: + invocationArguments[i] = GetProgressReporter(methodParameterTypes[i], progressReporter); + continue; + case JobMethodParameterBindingKind.Service: invocationArguments[i] = ResolveBoundService(serviceProvider, methodParameterTypes[i], parameterBindings[i]); continue; @@ -286,6 +297,18 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken return invocationArguments; } + private static DecimalJobProgressReporter GetProgressReporter( + Type parameterType, + DecimalJobProgressReporter progressReporter) + { + if (parameterType != typeof(IProgress)) + { + throw new InvalidOperationException($"Progress reporter parameter type '{parameterType}' is not supported."); + } + + return progressReporter; + } + private static object ResolveBoundService( IServiceProvider serviceProvider, Type parameterType, @@ -310,6 +333,32 @@ private static IReadOnlyList NormalizeParameterBindin IReadOnlyList? parameterBindings) => JobMethodParameterBindingResolver.Normalize(methodParameterTypes, parameterBindings); + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Job progress telemetry is best-effort by v4 design.")] + private sealed class DecimalJobProgressReporter( + Guid jobId, + int attemptNumber, + IJobEventSink eventSink, + ILogger logger) : IProgress + { + public void Report(decimal value) + { + var request = new AppendJobEventRequest( + jobId, + JobEventKind.Progress, + attemptNumber, + ProgressPercent: (double)Math.Clamp(value, 0, 100)); + + try + { + eventSink.AppendAsync(request).AsTask().GetAwaiter().GetResult(); + } + catch (Exception exception) + { + logger.JobEventAppendFailed(exception, jobId); + } + } + } + private void TrackRunningJob(Task executionTask) { this._runningJobs.TryAdd(executionTask, 0); diff --git a/src/Sheddueller/Enqueueing/JobEnqueuer.cs b/src/Sheddueller/Enqueueing/JobEnqueuer.cs index c081b07..f725657 100644 --- a/src/Sheddueller/Enqueueing/JobEnqueuer.cs +++ b/src/Sheddueller/Enqueueing/JobEnqueuer.cs @@ -29,6 +29,18 @@ public ValueTask EnqueueAsync( CancellationToken cancellationToken = default) => this.EnqueueCoreAsync(JobExpressionParser.Parse(work), submission, cancellationToken); + public ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.EnqueueCoreAsync(JobExpressionParser.Parse(work), submission, cancellationToken); + + public ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.EnqueueCoreAsync(JobExpressionParser.Parse(work), submission, cancellationToken); + public ValueTask EnqueueAsync( Expression> work, JobSubmission? submission = null, @@ -41,6 +53,18 @@ public ValueTask EnqueueAsync( CancellationToken cancellationToken = default) => this.EnqueueCoreAsync(JobExpressionParser.Parse(work), submission, cancellationToken); + public ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.EnqueueCoreAsync(JobExpressionParser.Parse(work), submission, cancellationToken); + + public ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default) + => this.EnqueueCoreAsync(JobExpressionParser.Parse(work), submission, cancellationToken); + public async ValueTask> EnqueueManyAsync( IReadOnlyList jobs, CancellationToken cancellationToken = default) diff --git a/src/Sheddueller/Enqueueing/JobExpressionParser.cs b/src/Sheddueller/Enqueueing/JobExpressionParser.cs index 6ddfd44..0ec7a70 100644 --- a/src/Sheddueller/Enqueueing/JobExpressionParser.cs +++ b/src/Sheddueller/Enqueueing/JobExpressionParser.cs @@ -10,32 +10,67 @@ public static ParsedJob Parse(Expression(Expression, TResult>> work) + { + ArgumentNullException.ThrowIfNull(work); + + return Parse(serviceType: null, work, includesProgressReporter: true); } public static ParsedJob Parse(Expression> work) { ArgumentNullException.ThrowIfNull(work); - return Parse(typeof(TService), work); + return Parse(typeof(TService), work, includesProgressReporter: false); + } + + public static ParsedJob Parse(Expression, TResult>> work) + { + ArgumentNullException.ThrowIfNull(work); + + return Parse(typeof(TService), work, includesProgressReporter: true); } public static ParsedJob Parse(Type? serviceType, LambdaExpression work) { ArgumentNullException.ThrowIfNull(work); - if (serviceType is null && (work.Parameters.Count != 1 || work.Parameters[0].Type != typeof(CancellationToken))) + return Parse(serviceType, work, IncludesProgressReporter(serviceType, work)); + } + + private static ParsedJob Parse( + Type? serviceType, + LambdaExpression work, + bool includesProgressReporter) + { + ArgumentNullException.ThrowIfNull(work); + + if (serviceType is null && !ValidateStaticExpressionParameters(work, includesProgressReporter)) { - throw new ArgumentException("Submitted work must accept the scheduler cancellation token.", nameof(work)); + throw new ArgumentException( + includesProgressReporter + ? "Submitted work must accept the scheduler cancellation token and progress reporter." + : "Submitted work must accept the scheduler cancellation token.", + nameof(work)); } - if (serviceType is not null && (work.Parameters.Count != 2 || work.Parameters[1].Type != typeof(CancellationToken))) + if (serviceType is not null && !ValidateServiceExpressionParameters(work, includesProgressReporter)) { - throw new ArgumentException("Submitted work must accept a service instance and scheduler cancellation token.", nameof(work)); + throw new ArgumentException( + includesProgressReporter + ? "Submitted work must accept a service instance, scheduler cancellation token, and progress reporter." + : "Submitted work must accept a service instance and scheduler cancellation token.", + nameof(work)); } var serviceParameter = serviceType is null ? null : work.Parameters[0]; var cancellationTokenParameter = serviceType is null ? work.Parameters[0] : work.Parameters[1]; + var progressReporterParameter = includesProgressReporter + ? serviceType is null ? work.Parameters[1] : work.Parameters[2] + : null; var body = StripConvert(work.Body); if (body is not MethodCallExpression methodCall) @@ -55,6 +90,7 @@ public static ParsedJob Parse(Type? serviceType, LambdaExpression work) var serializableParameterTypes = new List(); var parameterBindings = new List(); var forwardedCancellationToken = false; + var forwardedProgressReporter = false; for (var i = 0; i < methodParameters.Length; i++) { @@ -80,6 +116,25 @@ public static ParsedJob Parse(Type? serviceType, LambdaExpression work) continue; } + if (parameter.ParameterType == typeof(IProgress)) + { + if (progressReporterParameter is null || !IsSameParameter(argument, progressReporterParameter)) + { + throw new ArgumentException( + "IProgress target method parameters must receive the scheduler-owned progress reporter.", + nameof(work)); + } + + forwardedProgressReporter = true; + parameterBindings.Add(new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter)); + continue; + } + + if (IsAnyProgressReporterType(parameter.ParameterType)) + { + throw new ArgumentException("Only IProgress is supported for scheduler-owned progress reporting.", nameof(work)); + } + if (parameter.ParameterType == typeof(IJobContext)) { if (!IsJobContextMarker(argument)) @@ -113,9 +168,14 @@ public static ParsedJob Parse(Type? serviceType, LambdaExpression work) } if ((serviceParameter is not null && ReferencesParameter(argument, serviceParameter)) - || ReferencesParameter(argument, cancellationTokenParameter)) + || ReferencesParameter(argument, cancellationTokenParameter) + || (progressReporterParameter is not null && ReferencesParameter(argument, progressReporterParameter))) { - throw new ArgumentException("Only the target service instance and scheduler cancellation token may be runtime-bound.", nameof(work)); + throw new ArgumentException( + progressReporterParameter is null + ? "Only the target service instance and scheduler cancellation token may be runtime-bound." + : "Only the target service instance, scheduler cancellation token, and progress reporter may be runtime-bound.", + nameof(work)); } ValidateSerializableParameterType(parameter.ParameterType, nameof(work)); @@ -132,6 +192,11 @@ public static ParsedJob Parse(Type? serviceType, LambdaExpression work) throw new ArgumentException("Submitted work must forward the scheduler-owned CancellationToken.", nameof(work)); } + if (progressReporterParameter is not null && !forwardedProgressReporter) + { + throw new ArgumentException("Submitted work must forward the scheduler-owned progress reporter.", nameof(work)); + } + return new ParsedJob( targetServiceType, methodCall.Method.Name, @@ -209,6 +274,37 @@ private static (Type ServiceType, JobInvocationTargetKind InvocationTargetKind) private static bool IsCompatibleServiceType(Type serviceType, Type targetType) => serviceType.IsAssignableTo(targetType) || targetType.IsAssignableTo(serviceType); + private static bool ValidateStaticExpressionParameters( + LambdaExpression work, + bool includesProgressReporter) + => includesProgressReporter + ? work.Parameters.Count == 2 + && work.Parameters[0].Type == typeof(CancellationToken) + && work.Parameters[1].Type == typeof(IProgress) + : work.Parameters.Count == 1 + && work.Parameters[0].Type == typeof(CancellationToken); + + private static bool ValidateServiceExpressionParameters( + LambdaExpression work, + bool includesProgressReporter) + => includesProgressReporter + ? work.Parameters.Count == 3 + && work.Parameters[1].Type == typeof(CancellationToken) + && work.Parameters[2].Type == typeof(IProgress) + : work.Parameters.Count == 2 + && work.Parameters[1].Type == typeof(CancellationToken); + + private static bool IncludesProgressReporter( + Type? serviceType, + LambdaExpression work) + => serviceType is null + ? work.Parameters.Count == 2 + && work.Parameters[0].Type == typeof(CancellationToken) + && work.Parameters[1].Type == typeof(IProgress) + : work.Parameters.Count == 3 + && work.Parameters[1].Type == typeof(CancellationToken) + && work.Parameters[2].Type == typeof(IProgress); + private static object? EvaluateArgument(Expression argument) { var converted = Expression.Convert(argument, typeof(object)); @@ -280,6 +376,7 @@ private static void ValidateSerializableParameterType(Type type, string paramete { if (typeof(CancellationToken).IsAssignableFrom(type) || typeof(IJobContext).IsAssignableFrom(type) + || IsAnyProgressReporterType(type) || typeof(Delegate).IsAssignableFrom(type) || typeof(Stream).IsAssignableFrom(type)) { @@ -289,12 +386,19 @@ private static void ValidateSerializableParameterType(Type type, string paramete private static void ValidateSerializableArgumentValue(object? value, string parameterName) { - if (value is CancellationToken or IJobContext or Delegate or Stream) + if (value is CancellationToken or IJobContext or Delegate or Stream + || (value is not null && IsAnyProgressReporterType(value.GetType()))) { throw new ArgumentException($"Argument value of type '{value.GetType()}' is not supported for serialized job arguments.", parameterName); } } + private static bool IsAnyProgressReporterType(Type type) + => IsProgressReporterInterface(type) || type.GetInterfaces().Any(IsProgressReporterInterface); + + private static bool IsProgressReporterInterface(Type type) + => type.IsGenericType && type.GetGenericTypeDefinition() == typeof(IProgress<>); + private sealed class ParameterReferenceVisitor(ParameterExpression parameter) : ExpressionVisitor { public bool Found { get; private set; } diff --git a/src/Sheddueller/IJobContext.cs b/src/Sheddueller/IJobContext.cs index a281d23..c5121f5 100644 --- a/src/Sheddueller/IJobContext.cs +++ b/src/Sheddueller/IJobContext.cs @@ -28,12 +28,4 @@ ValueTask LogAsync( string message, IReadOnlyDictionary? fields = null, CancellationToken cancellationToken = default); - - /// - /// Writes a durable job progress event. - /// - ValueTask ReportProgressAsync( - double? percent, - string? message = null, - CancellationToken cancellationToken = default); } diff --git a/src/Sheddueller/IJobEnqueuer.cs b/src/Sheddueller/IJobEnqueuer.cs index b51c20f..7cfef9e 100644 --- a/src/Sheddueller/IJobEnqueuer.cs +++ b/src/Sheddueller/IJobEnqueuer.cs @@ -36,6 +36,30 @@ ValueTask EnqueueAsync( JobSubmission? submission = null, CancellationToken cancellationToken = default); + /// + /// Enqueues a Task-returning job method call with scheduler-supplied progress reporting. + /// + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A token for canceling the enqueue operation. + /// The identifier of the queued job, or the existing queued job when generated idempotency matches. + ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default); + + /// + /// Enqueues a ValueTask-returning job method call with scheduler-supplied progress reporting. + /// + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A token for canceling the enqueue operation. + /// The identifier of the queued job, or the existing queued job when generated idempotency matches. + ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default); + /// /// Enqueues a Task-returning service method call. /// @@ -62,6 +86,32 @@ ValueTask EnqueueAsync( JobSubmission? submission = null, CancellationToken cancellationToken = default); + /// + /// Enqueues a Task-returning service method call with scheduler-supplied progress reporting. + /// + /// The service type resolved from dependency injection when the job executes. + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A token for canceling the enqueue operation. + /// The identifier of the queued job, or the existing queued job when generated idempotency matches. + ValueTask EnqueueAsync( + Expression, Task>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default); + + /// + /// Enqueues a ValueTask-returning service method call with scheduler-supplied progress reporting. + /// + /// The service type resolved from dependency injection when the job executes. + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A token for canceling the enqueue operation. + /// The identifier of the queued job, or the existing queued job when generated idempotency matches. + ValueTask EnqueueAsync( + Expression, ValueTask>> work, + JobSubmission? submission = null, + CancellationToken cancellationToken = default); + /// /// Atomically enqueues multiple service method calls. /// diff --git a/src/Sheddueller/IRecurringScheduleManager.cs b/src/Sheddueller/IRecurringScheduleManager.cs index abb33b7..fdcc28f 100644 --- a/src/Sheddueller/IRecurringScheduleManager.cs +++ b/src/Sheddueller/IRecurringScheduleManager.cs @@ -43,6 +43,38 @@ ValueTask CreateOrUpdateAsync( RecurringScheduleOptions? options = null, CancellationToken cancellationToken = default); + /// + /// Creates or replaces a Task-returning recurring schedule definition with scheduler-supplied progress reporting. + /// + /// The stable unique key for the schedule. + /// A standard five-field cron expression evaluated in UTC. + /// The method-call expression to materialize when an occurrence is due. + /// Options applied to jobs created by the schedule. + /// A token for canceling the storage operation. + /// Whether the schedule was created, changed, or already matched the submitted definition. + ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default); + + /// + /// Creates or replaces a ValueTask-returning recurring schedule definition with scheduler-supplied progress reporting. + /// + /// The stable unique key for the schedule. + /// A standard five-field cron expression evaluated in UTC. + /// The method-call expression to materialize when an occurrence is due. + /// Options applied to jobs created by the schedule. + /// A token for canceling the storage operation. + /// Whether the schedule was created, changed, or already matched the submitted definition. + ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default); + /// /// Creates or replaces a Task-returning recurring schedule definition. /// @@ -77,6 +109,40 @@ ValueTask CreateOrUpdateAsync( RecurringScheduleOptions? options = null, CancellationToken cancellationToken = default); + /// + /// Creates or replaces a Task-returning recurring schedule definition with scheduler-supplied progress reporting. + /// + /// The service type resolved from dependency injection when an occurrence runs. + /// The stable unique key for the schedule. + /// A standard five-field cron expression evaluated in UTC. + /// The method-call expression to materialize when an occurrence is due. + /// Options applied to jobs created by the schedule. + /// A token for canceling the storage operation. + /// Whether the schedule was created, changed, or already matched the submitted definition. + ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default); + + /// + /// Creates or replaces a ValueTask-returning recurring schedule definition with scheduler-supplied progress reporting. + /// + /// The service type resolved from dependency injection when an occurrence runs. + /// The stable unique key for the schedule. + /// A standard five-field cron expression evaluated in UTC. + /// The method-call expression to materialize when an occurrence is due. + /// Options applied to jobs created by the schedule. + /// A token for canceling the storage operation. + /// Whether the schedule was created, changed, or already matched the submitted definition. + ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default); + /// /// Manually triggers a recurring schedule by cloning its current stored template into one queued job. /// diff --git a/src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs b/src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs index 5522c8e..ba94347 100644 --- a/src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs +++ b/src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs @@ -55,6 +55,7 @@ private static string FormatArgument(JobInvocationParameterInspection parameter) JobMethodParameterBindingKind.Serialized => FormatSerializedArgument(parameter), JobMethodParameterBindingKind.CancellationToken => "CancellationToken", JobMethodParameterBindingKind.JobContext => "Job.Context", + JobMethodParameterBindingKind.ProgressReporter => "IProgress", JobMethodParameterBindingKind.Service => string.Create( CultureInfo.InvariantCulture, $"Job.Resolve<{ShortTypeName(parameter.Binding.ServiceType ?? parameter.ParameterType)}>()"), diff --git a/src/Sheddueller/JobEnqueueItem.cs b/src/Sheddueller/JobEnqueueItem.cs index da2a429..9958b96 100644 --- a/src/Sheddueller/JobEnqueueItem.cs +++ b/src/Sheddueller/JobEnqueueItem.cs @@ -53,6 +53,36 @@ public static JobEnqueueItem Create( return new JobEnqueueItem(serviceType: null, work, submission); } + /// + /// Creates a batch item for a Task-returning job method call with scheduler-supplied progress reporting. + /// + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A batch enqueue item. + public static JobEnqueueItem Create( + Expression, Task>> work, + JobSubmission? submission = null) + { + ArgumentNullException.ThrowIfNull(work); + + return new JobEnqueueItem(serviceType: null, work, submission); + } + + /// + /// Creates a batch item for a ValueTask-returning job method call with scheduler-supplied progress reporting. + /// + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A batch enqueue item. + public static JobEnqueueItem Create( + Expression, ValueTask>> work, + JobSubmission? submission = null) + { + ArgumentNullException.ThrowIfNull(work); + + return new JobEnqueueItem(serviceType: null, work, submission); + } + /// /// Creates a batch item for a Task-returning service method call. /// @@ -84,4 +114,36 @@ public static JobEnqueueItem Create( return new JobEnqueueItem(typeof(TService), work, submission); } + + /// + /// Creates a batch item for a Task-returning service method call with scheduler-supplied progress reporting. + /// + /// The service type resolved from dependency injection when the job executes. + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A batch enqueue item. + public static JobEnqueueItem Create( + Expression, Task>> work, + JobSubmission? submission = null) + { + ArgumentNullException.ThrowIfNull(work); + + return new JobEnqueueItem(typeof(TService), work, submission); + } + + /// + /// Creates a batch item for a ValueTask-returning service method call with scheduler-supplied progress reporting. + /// + /// The service type resolved from dependency injection when the job executes. + /// The method-call expression to persist as a job. + /// Optional queueing, retry, idempotency, and metadata settings. + /// A batch enqueue item. + public static JobEnqueueItem Create( + Expression, ValueTask>> work, + JobSubmission? submission = null) + { + ArgumentNullException.ThrowIfNull(work); + + return new JobEnqueueItem(typeof(TService), work, submission); + } } diff --git a/src/Sheddueller/Runtime/JobContext.cs b/src/Sheddueller/Runtime/JobContext.cs index 4b58e32..8ab6a06 100644 --- a/src/Sheddueller/Runtime/JobContext.cs +++ b/src/Sheddueller/Runtime/JobContext.cs @@ -34,27 +34,6 @@ await this.AppendBestEffortAsync( .ConfigureAwait(false); } - public async ValueTask ReportProgressAsync( - double? percent, - string? message = null, - CancellationToken cancellationToken = default) - { - if (percent is < 0 or > 100) - { - throw new ArgumentOutOfRangeException(nameof(percent), percent, "Progress percent must be between 0 and 100."); - } - - await this.AppendBestEffortAsync( - new AppendJobEventRequest( - this.JobId, - JobEventKind.Progress, - this.AttemptNumber, - Message: message, - ProgressPercent: percent), - cancellationToken) - .ConfigureAwait(false); - } - [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Job-context telemetry is best-effort by v4 design.")] private async ValueTask AppendBestEffortAsync( AppendJobEventRequest request, diff --git a/src/Sheddueller/Runtime/RecurringScheduleManager.cs b/src/Sheddueller/Runtime/RecurringScheduleManager.cs index 8704988..560e9ab 100644 --- a/src/Sheddueller/Runtime/RecurringScheduleManager.cs +++ b/src/Sheddueller/Runtime/RecurringScheduleManager.cs @@ -35,6 +35,22 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, work, options, cancellationToken); + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, work, options, cancellationToken); + + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, work, options, cancellationToken); + public ValueTask CreateOrUpdateAsync( string scheduleKey, string cronExpression, @@ -51,6 +67,22 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, work, options, cancellationToken); + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, work, options, cancellationToken); + + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => this.CreateOrUpdateCoreAsync(scheduleKey, cronExpression, work, options, cancellationToken); + public async ValueTask TriggerAsync( string scheduleKey, CancellationToken cancellationToken = default) @@ -135,6 +167,26 @@ private async ValueTask CreateOrUpdateCoreAsync CreateOrUpdateCoreAsync( + string scheduleKey, + string cronExpression, + Expression, TResult>> work, + RecurringScheduleOptions? options, + CancellationToken cancellationToken) + { + SubmissionValidator.ValidateScheduleKey(scheduleKey); + CronSchedule.Validate(cronExpression); + ArgumentNullException.ThrowIfNull(work); + + return await this.CreateOrUpdateCoreAsync( + scheduleKey, + cronExpression, + JobExpressionParser.Parse(work), + options, + cancellationToken) + .ConfigureAwait(false); + } + private async ValueTask CreateOrUpdateCoreAsync( string scheduleKey, string cronExpression, @@ -155,6 +207,26 @@ private async ValueTask CreateOrUpdateCoreAsync CreateOrUpdateCoreAsync( + string scheduleKey, + string cronExpression, + Expression, TResult>> work, + RecurringScheduleOptions? options, + CancellationToken cancellationToken) + { + SubmissionValidator.ValidateScheduleKey(scheduleKey); + CronSchedule.Validate(cronExpression); + ArgumentNullException.ThrowIfNull(work); + + return await this.CreateOrUpdateCoreAsync( + scheduleKey, + cronExpression, + JobExpressionParser.Parse(work), + options, + cancellationToken) + .ConfigureAwait(false); + } + private async ValueTask CreateOrUpdateCoreAsync( string scheduleKey, string cronExpression, diff --git a/src/Sheddueller/Storage/JobMethodParameterBindingKind.cs b/src/Sheddueller/Storage/JobMethodParameterBindingKind.cs index 8170fac..5b8617e 100644 --- a/src/Sheddueller/Storage/JobMethodParameterBindingKind.cs +++ b/src/Sheddueller/Storage/JobMethodParameterBindingKind.cs @@ -24,4 +24,9 @@ public enum JobMethodParameterBindingKind /// The parameter value is resolved from dependency injection at execution time. /// Service = 3, + + /// + /// The parameter value is the scheduler-owned job progress reporter. + /// + ProgressReporter = 4, } diff --git a/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs b/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs index b33d533..e4a0995 100644 --- a/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs +++ b/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs @@ -47,6 +47,7 @@ private static JobMethodParameterBinding CreateInferredBinding(Type parameterTyp { Type type when type == typeof(CancellationToken) => new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), Type type when type == typeof(IJobContext) => new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext), + Type type when type == typeof(IProgress) => new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter), _ => new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), }; @@ -55,5 +56,7 @@ private static JobMethodParameterBinding CreateInferredBinding(string parameterT ? new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken) : string.Equals(parameterType, typeof(IJobContext).AssemblyQualifiedName, StringComparison.Ordinal) ? new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext) - : new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized); + : string.Equals(parameterType, typeof(IProgress).AssemblyQualifiedName, StringComparison.Ordinal) + ? new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter) + : new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized); } diff --git a/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs b/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs index 6852435..c2d0cd7 100644 --- a/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs +++ b/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs @@ -1014,6 +1014,22 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => throw new NotSupportedException(); + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + public ValueTask CreateOrUpdateAsync( string scheduleKey, string cronExpression, @@ -1030,6 +1046,22 @@ public ValueTask CreateOrUpdateAsync( CancellationToken cancellationToken = default) => throw new NotSupportedException(); + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, Task>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask CreateOrUpdateAsync( + string scheduleKey, + string cronExpression, + Expression, ValueTask>> work, + RecurringScheduleOptions? options = null, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + public ValueTask DeleteAsync( string scheduleKey, CancellationToken cancellationToken = default) diff --git a/test/Sheddueller.Testing.Tests/FakeJobEnqueuerTests.cs b/test/Sheddueller.Testing.Tests/FakeJobEnqueuerTests.cs index 3a392c9..3803f4b 100644 --- a/test/Sheddueller.Testing.Tests/FakeJobEnqueuerTests.cs +++ b/test/Sheddueller.Testing.Tests/FakeJobEnqueuerTests.cs @@ -79,6 +79,30 @@ await fake.EnqueueAsync( match[0].InvocationTargetKind.ShouldBe(JobInvocationTargetKind.Static); } + [Fact] + public async Task Enqueue_ProgressAwareServiceMethod_RecordsJobAndMatchesByExpression() + { + var fake = new FakeJobEnqueuer(); + var payload = new SamplePayload("alpha", 42); + + var jobId = await fake.EnqueueAsync( + (s, ct, progress) => s.HandleWithProgressAsync(payload, progress, ct)); + + var match = await fake.MatchAsync( + (s, ct, progress) => s.HandleWithProgressAsync(new SamplePayload("alpha", 42), progress, ct)); + + match.Count.ShouldBe(1); + match[0].JobId.ShouldBe(jobId); + match[0].MethodName.ShouldBe(nameof(TestJobService.HandleWithProgressAsync)); + match[0].MethodParameterTypes.ShouldBe([typeof(SamplePayload), typeof(IProgress), typeof(CancellationToken)]); + match[0].MethodParameterBindings.ShouldBe([ + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ]); + match[0].SerializableArguments.ShouldBe([payload]); + } + [Fact] public async Task EnqueueMany_MixedJobs_RecordsBatchMetadataAndReturnsIdsInInputOrder() { @@ -176,6 +200,9 @@ public Task HandleAsync(SamplePayload payload, CancellationToken cancellationTok public Task HandleStringAsync(string value, CancellationToken cancellationToken) => Task.CompletedTask; + public Task HandleWithProgressAsync(SamplePayload payload, IProgress progress, CancellationToken cancellationToken) + => Task.CompletedTask; + public Task NoTokenAsync() => Task.CompletedTask; diff --git a/test/Sheddueller.Testing.Tests/FakeRecurringScheduleManagerTests.cs b/test/Sheddueller.Testing.Tests/FakeRecurringScheduleManagerTests.cs index f3a3388..f480751 100644 --- a/test/Sheddueller.Testing.Tests/FakeRecurringScheduleManagerTests.cs +++ b/test/Sheddueller.Testing.Tests/FakeRecurringScheduleManagerTests.cs @@ -3,6 +3,7 @@ namespace Sheddueller.Testing.Tests; using Microsoft.Extensions.Time.Testing; using Sheddueller.Serialization; +using Sheddueller.Storage; using Shouldly; @@ -58,7 +59,31 @@ await fake.CreateOrUpdateAsync( (s, ct) => s.HandleAsync(new SamplePayload("alpha", 42), ct))).ShouldBeEmpty(); (await fake.MatchAsync( "schedule-a", - (s, ct) => s.HandleAsync(new SamplePayload("alpha", 43), ct))).ShouldBeEmpty(); + (s, ct) => s.HandleAsync(new SamplePayload("alpha", 43), ct))).ShouldBeEmpty(); + } + + [Fact] + public async Task CreateOrUpdate_ProgressAwareServiceMethod_RecordsScheduleAndMatchesByExpression() + { + var fake = new FakeRecurringScheduleManager(); + var payload = new SamplePayload("alpha", 42); + + await fake.CreateOrUpdateAsync( + "schedule-a", + "* * * * *", + (s, ct, progress) => s.HandleWithProgressAsync(payload, progress, ct)); + + var match = await fake.MatchAsync( + "schedule-a", + (s, ct, progress) => s.HandleWithProgressAsync(new SamplePayload("alpha", 42), progress, ct)); + + match.Count.ShouldBe(1); + match[0].MethodParameterTypes.ShouldBe([typeof(SamplePayload), typeof(IProgress), typeof(CancellationToken)]); + match[0].MethodParameterBindings.ShouldBe([ + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ]); } [Fact] @@ -236,6 +261,9 @@ public Task HandleAsync(SamplePayload payload, CancellationToken cancellationTok public Task HandleStringAsync(string value, CancellationToken cancellationToken) => Task.CompletedTask; + + public Task HandleWithProgressAsync(SamplePayload payload, IProgress progress, CancellationToken cancellationToken) + => Task.CompletedTask; } private sealed class ConstantPayloadSerializer : IJobPayloadSerializer diff --git a/test/Sheddueller.Tests/JobEnqueuerTests.cs b/test/Sheddueller.Tests/JobEnqueuerTests.cs index 32a688c..0376fac 100644 --- a/test/Sheddueller.Tests/JobEnqueuerTests.cs +++ b/test/Sheddueller.Tests/JobEnqueuerTests.cs @@ -66,6 +66,60 @@ public async Task Enqueue_JobContextAwareMethod_PersistsMethodIdentityWithoutSer arguments[0].ShouldBe(payload); } + [Fact] + public async Task Enqueue_ProgressAwareMethod_PersistsMethodIdentityWithoutSerializingProgress() + { + using var provider = CreateProvider(); + var enqueuer = provider.GetRequiredService(); + var store = provider.GetRequiredService(); + var payload = new SamplePayload("alpha", 42); + + var jobId = await enqueuer.EnqueueAsync( + (service, cancellationToken, progress) => service.HandleWithProgressAsync(payload, progress, cancellationToken)); + + var request = store.GetRequest(jobId); + request.MethodParameterTypes.ShouldBe([ + typeof(SamplePayload).AssemblyQualifiedName!, + typeof(IProgress).AssemblyQualifiedName!, + typeof(CancellationToken).AssemblyQualifiedName!, + ]); + request.MethodParameterBindings.ShouldBe([ + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ]); + + var arguments = await provider.GetRequiredService() + .DeserializeAsync(request.SerializedArguments, [typeof(SamplePayload)]); + + arguments.ShouldBe([payload]); + } + + [Fact] + public async Task Enqueue_StaticProgressAwareMethod_PersistsStaticInvocationMetadata() + { + using var provider = CreateProvider(); + var enqueuer = provider.GetRequiredService(); + var store = provider.GetRequiredService(); + + var jobId = await enqueuer.EnqueueAsync( + (cancellationToken, progress) => EnqueueTestService.StaticWithProgressAsync(progress, cancellationToken)); + + var request = store.GetRequest(jobId); + request.ServiceType.ShouldBe(typeof(EnqueueTestService).AssemblyQualifiedName); + request.MethodName.ShouldBe(nameof(EnqueueTestService.StaticWithProgressAsync)); + request.InvocationTargetKind.ShouldBe(JobInvocationTargetKind.Static); + request.MethodParameterBindings.ShouldBe([ + new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ]); + + var arguments = await provider.GetRequiredService() + .DeserializeAsync(request.SerializedArguments, []); + + arguments.ShouldBeEmpty(); + } + [Fact] public async Task Enqueue_StaticMethodCall_PersistsStaticInvocationMetadata() { @@ -227,6 +281,27 @@ public async Task EnqueueMany_MixedServiceMethods_PersistsJobsAndReturnsIdsInInp thirdRequest.InvocationTargetKind.ShouldBe(JobInvocationTargetKind.Static); } + [Fact] + public async Task EnqueueMany_ProgressAwareMethod_PersistsProgressBinding() + { + using var provider = CreateProvider(); + var enqueuer = provider.GetRequiredService(); + var store = provider.GetRequiredService(); + var payload = new SamplePayload("alpha", 42); + + var jobIds = await enqueuer.EnqueueManyAsync([ + JobEnqueueItem.Create( + (service, cancellationToken, progress) => service.HandleWithProgressAsync(payload, progress, cancellationToken)), + ]); + + var request = store.GetRequest(jobIds[0]); + request.MethodParameterBindings.ShouldBe([ + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ]); + } + [Fact] public async Task EnqueueMany_EmptyBatch_ReturnsEmptyAndDoesNotPersistJob() { @@ -392,6 +467,24 @@ await Should.ThrowAsync( () => enqueuer.EnqueueAsync( (service, cancellationToken) => service.HandleObjectAsync(Job.Context, cancellationToken)).AsTask()); + await Should.ThrowAsync( + () => enqueuer.EnqueueAsync( + (service, cancellationToken, progress) => service.HandleAsync(payload, cancellationToken)).AsTask()); + + await Should.ThrowAsync( + () => enqueuer.EnqueueAsync( + (service, cancellationToken, progress) => service.HandleObjectAsync(progress, cancellationToken)).AsTask()); + + var capturedProgress = new Progress(); + await Should.ThrowAsync( + () => enqueuer.EnqueueAsync( + (service, cancellationToken) => service.HandleWithProgressAsync(payload, capturedProgress, cancellationToken)).AsTask()); + + var capturedIntProgress = new Progress(); + await Should.ThrowAsync( + () => enqueuer.EnqueueAsync( + (service, cancellationToken) => service.HandleIntProgressAsync(capturedIntProgress, cancellationToken)).AsTask()); + await Should.ThrowAsync( () => enqueuer.EnqueueAsync( cancellationToken => new EnqueueTestService().HandleStringAsync("new-target", cancellationToken)).AsTask()); @@ -449,6 +542,16 @@ public Task HandleWithContextAsync(SamplePayload payload, IJobContext jobContext return Task.CompletedTask; } + public Task HandleWithProgressAsync(SamplePayload payload, IProgress progress, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task HandleIntProgressAsync(IProgress progress, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + public Task HandleObjectAsync(object value, CancellationToken cancellationToken) { return Task.CompletedTask; @@ -478,6 +581,11 @@ public static Task StaticWithPayloadAsync(SamplePayload payload, CancellationTok { return Task.CompletedTask; } + + public static Task StaticWithProgressAsync(IProgress progress, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } } private sealed class SecondEnqueueTestService @@ -504,11 +612,5 @@ public ValueTask LogAsync( IReadOnlyDictionary? fields = null, CancellationToken cancellationToken = default) => ValueTask.CompletedTask; - - public ValueTask ReportProgressAsync( - double? percent, - string? message = null, - CancellationToken cancellationToken = default) - => ValueTask.CompletedTask; } } diff --git a/test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs b/test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs index 1144edc..b4a88b7 100644 --- a/test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs +++ b/test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs @@ -58,6 +58,22 @@ public void Format_ThreeArguments_UsesMultilineLayout() " CancellationToken)")); } + [Fact] + public void Format_ProgressReporterBinding_DisplaysClrProgressMarker() + { + var call = JobInvocationDisplayFormatter.Format( + typeof(NestedService).AssemblyQualifiedName!, + "Run", + [ + new JobInvocationParameterInspection( + 0, + typeof(IProgress).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter)), + ]); + + call.ShouldBe("NestedService.Run(IProgress)"); + } + private sealed class NestedService { } diff --git a/test/Sheddueller.Worker.Tests/WorkerProgressTests.cs b/test/Sheddueller.Worker.Tests/WorkerProgressTests.cs new file mode 100644 index 0000000..105c531 --- /dev/null +++ b/test/Sheddueller.Worker.Tests/WorkerProgressTests.cs @@ -0,0 +1,286 @@ +namespace Sheddueller.Worker.Tests; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +using Sheddueller.Serialization; +using Sheddueller.Storage; +using Sheddueller.Worker.Internal; + +using Shouldly; + +public sealed class WorkerProgressTests +{ + [Fact] + public async Task JobExecution_ProgressReporter_RecordsProgressEvent() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(ProgressJob.ReportProgress)); + var store = new SingleClaimJobStore(job); + var eventSink = new RecordingJobEventSink(); + await using var provider = CreateProvider(store, eventSink); + var worker = provider.GetServices().OfType().Single(); + + await worker.StartAsync(cancellationTokenSource.Token); + await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + await worker.StopAsync(cancellationTokenSource.Token); + + var request = eventSink.Requests.ShouldHaveSingleItem(); + request.JobId.ShouldBe(job.JobId); + request.Kind.ShouldBe(JobEventKind.Progress); + request.AttemptNumber.ShouldBe(job.AttemptCount); + request.ProgressPercent.ShouldBe(42.5); + request.Message.ShouldBeNull(); + } + + [Fact] + public async Task JobExecution_ProgressReporterOutOfRange_ClampsProgressEvent() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(ProgressJob.ReportOutOfRangeAsync)); + var store = new SingleClaimJobStore(job); + var eventSink = new RecordingJobEventSink(); + await using var provider = CreateProvider(store, eventSink); + var worker = provider.GetServices().OfType().Single(); + + await worker.StartAsync(cancellationTokenSource.Token); + await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + await worker.StopAsync(cancellationTokenSource.Token); + + eventSink.Requests.Select(request => request.ProgressPercent).ShouldBe([100, 0]); + } + + private static ServiceProvider CreateProvider( + SingleClaimJobStore store, + RecordingJobEventSink eventSink) + { + var services = new ServiceCollection(); + services.AddSingleton(eventSink); + services.AddSingleton(serviceProvider => serviceProvider.GetRequiredService()); + services.AddSingleton(store); + services.AddSingleton(serviceProvider => serviceProvider.GetRequiredService()); + services.AddTransient(); + services.AddShedduellerWorker(builder => builder.ConfigureOptions(options => + { + options.NodeId = "worker-progress"; + options.IdlePollingInterval = TimeSpan.FromMilliseconds(10); + options.HeartbeatInterval = TimeSpan.FromSeconds(5); + options.LeaseDuration = TimeSpan.FromSeconds(30); + })); + return services.BuildServiceProvider(); + } + + private static ClaimedJob CreateClaimedJob(string methodName) + => new( + Guid.NewGuid(), + EnqueueSequence: 1, + Priority: 0, + ServiceType: typeof(ProgressJob).AssemblyQualifiedName!, + MethodName: methodName, + MethodParameterTypes: + [ + typeof(IProgress).AssemblyQualifiedName!, + typeof(CancellationToken).AssemblyQualifiedName!, + ], + SerializedArguments: new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), + ConcurrencyGroupKeys: [], + AttemptCount: 1, + MaxAttempts: 1, + LeaseToken: Guid.NewGuid(), + LeaseExpiresAtUtc: DateTimeOffset.UtcNow.AddSeconds(30), + RetryBackoffKind: null, + RetryBaseDelay: null, + RetryMaxDelay: null, + SourceScheduleKey: null, + ScheduledFireAtUtc: null, + MethodParameterBindings: + [ + new JobMethodParameterBinding(JobMethodParameterBindingKind.ProgressReporter), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ]); + + private sealed class ProgressJob + { + public Task ReportProgress(IProgress progress, CancellationToken cancellationToken) + { + progress.Report(42.5m); + return Task.CompletedTask; + } + + public Task ReportOutOfRangeAsync(IProgress progress, CancellationToken cancellationToken) + { + progress.Report(100.1m); + progress.Report(-0.1m); + return Task.CompletedTask; + } + } + + private sealed class RecordingJobEventSink : IJobEventSink + { + private readonly Lock _syncRoot = new(); + private readonly List _requests = []; + + public IReadOnlyList Requests + { + get + { + lock (this._syncRoot) + { + return Array.AsReadOnly([.. this._requests]); + } + } + } + + public ValueTask AppendAsync( + AppendJobEventRequest request, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + lock (this._syncRoot) + { + this._requests.Add(request); + return ValueTask.FromResult(new JobEvent( + Guid.NewGuid(), + request.JobId, + this._requests.Count, + request.Kind, + DateTimeOffset.UtcNow, + request.AttemptNumber, + request.LogLevel, + request.Message, + request.ProgressPercent, + request.Fields)); + } + } + } + + private sealed class SingleClaimJobStore(ClaimedJob job) : IJobStore + { + private int _claimed; + + public TaskCompletionSource Completed { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public TaskCompletionSource Failed { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public ValueTask EnqueueAsync( + EnqueueJobRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> EnqueueManyAsync( + IReadOnlyList requests, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TryClaimNextAsync( + ClaimJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult( + Interlocked.Exchange(ref this._claimed, 1) == 0 + ? new ClaimJobResult.Claimed(job) + : new ClaimJobResult.NoJobAvailable()); + + public ValueTask MarkCompletedAsync( + CompleteJobRequest request, + CancellationToken cancellationToken = default) + { + this.Completed.TrySetResult(request); + return ValueTask.FromResult(true); + } + + public ValueTask MarkFailedAsync( + FailJobRequest request, + CancellationToken cancellationToken = default) + { + this.Failed.TrySetResult(request); + return ValueTask.FromResult(true); + } + + public ValueTask RenewLeaseAsync( + RenewLeaseRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask ReleaseJobAsync( + ReleaseJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecoverExpiredLeasesAsync( + RecoverExpiredLeasesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + + public ValueTask CancelAsync( + CancelJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(JobCancellationResult.NotFound); + + public ValueTask GetCancellationRequestedAtAsync( + JobCancellationStatusRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(null); + + public ValueTask MarkCancellationObservedAsync( + ObserveJobCancellationRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecordWorkerNodeHeartbeatAsync( + WorkerNodeHeartbeatRequest request, + CancellationToken cancellationToken = default) + => ValueTask.CompletedTask; + + public ValueTask SetConcurrencyLimitAsync( + SetConcurrencyLimitRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetConfiguredConcurrencyLimitAsync( + string groupKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask CreateOrUpdateRecurringScheduleAsync( + UpsertRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TriggerRecurringScheduleAsync( + TriggerRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask DeleteRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask PauseRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset pausedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask ResumeRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset resumedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> ListRecurringSchedulesAsync( + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask MaterializeDueRecurringSchedulesAsync( + MaterializeDueRecurringSchedulesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + } +} From d087cc2c591439b15c273d6265891169198cd87d Mon Sep 17 00:00:00 2001 From: Jack Bird Date: Mon, 27 Apr 2026 15:05:45 +0100 Subject: [PATCH 2/2] feat: capture log output from ILogger --- README.md | 26 +- .../DemoJobs/DemoJobService.cs | 95 ++- .../LauncherPageRenderer.cs | 2 +- samples/Sheddueller.SampleHost/Program.cs | 16 +- samples/Sheddueller.SampleHost/README.md | 2 +- .../Internal/CapturedJobLogContext.cs | 6 + .../Internal/JobLogCaptureContext.cs | 50 ++ .../ShedduellerJobLogEventDispatcher.cs | 81 +++ .../Internal/ShedduellerJobLogEventQueue.cs | 26 + .../Internal/ShedduellerJobLoggerProvider.cs | 150 +++++ .../Internal/ShedduellerWorker.cs | 15 +- ...uellerWorkerServiceCollectionExtensions.cs | 4 + src/Sheddueller/IJobContext.cs | 9 - src/Sheddueller/Runtime/JobContext.cs | 64 -- .../JobContextLoggingTests.cs | 44 -- test/Sheddueller.Tests/JobEnqueuerTests.cs | 7 - .../RegistrationTests.cs | 3 +- .../WorkerJobLoggerTests.cs | 561 ++++++++++++++++++ 18 files changed, 996 insertions(+), 165 deletions(-) create mode 100644 src/Sheddueller.Worker/Internal/CapturedJobLogContext.cs create mode 100644 src/Sheddueller.Worker/Internal/JobLogCaptureContext.cs create mode 100644 src/Sheddueller.Worker/Internal/ShedduellerJobLogEventDispatcher.cs create mode 100644 src/Sheddueller.Worker/Internal/ShedduellerJobLogEventQueue.cs create mode 100644 src/Sheddueller.Worker/Internal/ShedduellerJobLoggerProvider.cs delete mode 100644 test/Sheddueller.Tests/JobContextLoggingTests.cs create mode 100644 test/Sheddueller.Worker.Tests/WorkerJobLoggerTests.cs diff --git a/README.md b/README.md index 36e2088..8b72a96 100644 --- a/README.md +++ b/README.md @@ -120,25 +120,26 @@ Use `UsePostgres(postgres => postgres.DataSource = dataSource)` when an applicat ## Enqueue Jobs -Job methods return `Task` or `ValueTask` and receive the scheduler-owned `CancellationToken`. Use `Job.Context` when a handler needs durable logs, the job id, or the attempt number. Use scheduler-supplied `IProgress` for durable progress updates. +Job methods return `Task` or `ValueTask` and receive the scheduler-owned `CancellationToken`. Use constructor-injected `ILogger` for durable job logs, `Job.Context` when a handler needs the job id or attempt number, and scheduler-supplied `IProgress` for durable progress updates. ```csharp -public sealed class EmailJobs +using Microsoft.Extensions.Logging; + +public sealed class EmailJobs(ILogger logger) { public async Task SendWelcomeAsync( Guid userId, - IJobContext job, IProgress progress, CancellationToken cancellationToken) { - await job.LogAsync(JobLogLevel.Information, "Sending welcome email.", cancellationToken: cancellationToken); + logger.LogInformation("Sending welcome email for user {UserId}.", userId); await SendEmailAsync(userId, cancellationToken); progress.Report(100); } } var jobId = await enqueuer.EnqueueAsync( - (jobs, ct, progress) => jobs.SendWelcomeAsync(userId, Job.Context, progress, ct), + (jobs, ct, progress) => jobs.SendWelcomeAsync(userId, progress, ct), new JobSubmission( Priority: 10, ConcurrencyGroupKeys: ["email"], @@ -147,6 +148,17 @@ var jobId = await enqueuer.EnqueueAsync( cancellationToken); ``` +Sheddueller captures job logs by registering a Microsoft `ILoggerProvider`. If the application uses Serilog as the host logger and still wants Sheddueller's provider to receive log events, enable provider forwarding: + +```csharp +builder.Host.UseSerilog( + (_, _, loggerConfiguration) => loggerConfiguration.WriteTo.Logger(Log.Logger), + preserveStaticLogger: true, + writeToProviders: true); +``` + +Because Sheddueller's capture is a Microsoft `ILoggerProvider`, Microsoft logging filters still apply. Configure `Logging`/`ILoggingBuilder` filters alongside any Serilog filtering rules if you want to control which job logs are stored by Sheddueller. + Use `NotBeforeUtc` for delayed jobs. Use `JobIdempotencyKind.MethodAndArguments` to reuse an existing queued job with the same target method and serialized arguments. ## Recurring Schedules @@ -157,7 +169,7 @@ Recurring schedules are keyed definitions. Calling `CreateOrUpdateAsync` at star await schedules.CreateOrUpdateAsync( "email:daily-digest", "0 2 * * *", - (jobs, ct, progress) => jobs.SendDailyDigestAsync(Job.Context, progress, ct), + (jobs, ct, progress) => jobs.SendDailyDigestAsync(progress, ct), new RecurringScheduleOptions( Priority: 5, ConcurrencyGroupKeys: ["email"], @@ -192,7 +204,7 @@ var capture = provider.GetRequiredService().Capture(); await subject.DoSomethingThatEnqueuesAsync(); var matches = await capture.Fake.MatchAsync( - (jobs, ct, progress) => jobs.SendWelcomeAsync(userId, Job.Context, progress, ct)); + (jobs, ct, progress) => jobs.SendWelcomeAsync(userId, progress, ct)); ``` The same package includes `FakeJobEnqueuer`, `FakeRecurringScheduleManager`, and async-context-aware capture services for dependency-injected tests. diff --git a/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs b/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs index 8abbb8a..618a73e 100644 --- a/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs +++ b/samples/Sheddueller.SampleHost/DemoJobs/DemoJobService.cs @@ -2,9 +2,14 @@ namespace Sheddueller.SampleHost.DemoJobs; using System.Globalization; -public sealed class DemoJobService(DemoJobState state) +using Microsoft.Extensions.Logging; + +public sealed class DemoJobService( + DemoJobState state, + ILogger logger) { private readonly DemoJobState _state = state; + private readonly ILogger _logger = logger; public Task RunQuickAsync(string label, CancellationToken cancellationToken) { @@ -15,15 +20,13 @@ public Task RunQuickAsync(string label, CancellationToken cancellationToken) public async Task RunProgressAsync( string label, - IJobContext jobContext, IProgress progress, CancellationToken cancellationToken) { for (var step = 0; step <= 4; step++) { var percent = step * 25; - var message = $"{label} step {step + 1}/5"; - await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false); + ProgressStep(this._logger, label, step + 1, null); progress.Report(percent); if (step < 4) @@ -33,16 +36,17 @@ public async Task RunProgressAsync( } } - public async Task RunRetryUntilSuccessAsync( + public Task RunRetryUntilSuccessAsync( string runKey, int failuresBeforeSuccess, - IJobContext jobContext, IProgress progress, CancellationToken cancellationToken) { + cancellationToken.ThrowIfCancellationRequested(); + var attempt = this._state.IncrementAttemptCount(runKey); var message = $"retry demo {runKey} attempt {attempt}"; - await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false); + RetryAttempt(this._logger, runKey, attempt, null); if (attempt <= failuresBeforeSuccess) { @@ -50,26 +54,27 @@ public async Task RunRetryUntilSuccessAsync( } progress.Report(100); + return Task.CompletedTask; } - public async Task RunAlwaysFailAsync(string label, IJobContext jobContext, CancellationToken cancellationToken) + public Task RunAlwaysFailAsync(string label, CancellationToken cancellationToken) { - await jobContext.LogAsync(JobLogLevel.Error, $"{label} is about to fail permanently.", cancellationToken: cancellationToken).ConfigureAwait(false); + cancellationToken.ThrowIfCancellationRequested(); + PermanentFailure(this._logger, label, null); throw new InvalidOperationException($"{label} failed on purpose."); } public async Task RunGroupHoldAsync( string label, - IJobContext jobContext, IProgress progress, CancellationToken cancellationToken) { - await jobContext.LogAsync(JobLogLevel.Information, $"{label} claimed the demo concurrency slot.", cancellationToken: cancellationToken).ConfigureAwait(false); + GroupSlotClaimed(this._logger, label, null); for (var step = 1; step <= 6; step++) { var percent = step * (100m / 6m); - await jobContext.LogAsync(JobLogLevel.Information, $"{label} holding slot {step}/6", cancellationToken: cancellationToken).ConfigureAwait(false); + GroupSlotHeld(this._logger, label, step, null); progress.Report(percent); await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken).ConfigureAwait(false); } @@ -77,31 +82,77 @@ public async Task RunGroupHoldAsync( public async Task RunIdempotentDemoAsync( string label, - IJobContext jobContext, IProgress progress, CancellationToken cancellationToken) { - await jobContext.LogAsync(JobLogLevel.Information, $"{label} started its 10-second idempotency demo run.", cancellationToken: cancellationToken) - .ConfigureAwait(false); + IdempotentRunStarted(this._logger, label, null); for (var step = 1; step <= 10; step++) { await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); var percent = step * 10; - await jobContext.LogAsync(JobLogLevel.Information, $"{label} idempotent run {step}/10", cancellationToken: cancellationToken).ConfigureAwait(false); + IdempotentRunStep(this._logger, label, step, null); progress.Report(percent); } } - public async Task RunRecurringAsync( - IJobContext jobContext, + public Task RunRecurringAsync( IProgress progress, CancellationToken cancellationToken) { - var message = string.Create( - CultureInfo.InvariantCulture, - $"Recurring demo fired at {DateTimeOffset.UtcNow:O}"); - await jobContext.LogAsync(JobLogLevel.Information, message, cancellationToken: cancellationToken).ConfigureAwait(false); + cancellationToken.ThrowIfCancellationRequested(); + + var firedAtUtc = DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture); + RecurringFired(this._logger, firedAtUtc, null); progress.Report(100); + return Task.CompletedTask; } + + private static readonly Action ProgressStep = + LoggerMessage.Define( + LogLevel.Information, + new EventId(100, nameof(ProgressStep)), + "{Label} step {Step}/5"); + + private static readonly Action RetryAttempt = + LoggerMessage.Define( + LogLevel.Information, + new EventId(101, nameof(RetryAttempt)), + "retry demo {RunKey} attempt {Attempt}"); + + private static readonly Action PermanentFailure = + LoggerMessage.Define( + LogLevel.Error, + new EventId(102, nameof(PermanentFailure)), + "{Label} is about to fail permanently."); + + private static readonly Action GroupSlotClaimed = + LoggerMessage.Define( + LogLevel.Information, + new EventId(103, nameof(GroupSlotClaimed)), + "{Label} claimed the demo concurrency slot."); + + private static readonly Action GroupSlotHeld = + LoggerMessage.Define( + LogLevel.Information, + new EventId(104, nameof(GroupSlotHeld)), + "{Label} holding slot {Step}/6"); + + private static readonly Action IdempotentRunStarted = + LoggerMessage.Define( + LogLevel.Information, + new EventId(105, nameof(IdempotentRunStarted)), + "{Label} started its 10-second idempotency demo run."); + + private static readonly Action IdempotentRunStep = + LoggerMessage.Define( + LogLevel.Information, + new EventId(106, nameof(IdempotentRunStep)), + "{Label} idempotent run {Step}/10"); + + private static readonly Action RecurringFired = + LoggerMessage.Define( + LogLevel.Information, + new EventId(107, nameof(RecurringFired)), + "Recurring demo fired at {FiredAtUtc}"); } diff --git a/samples/Sheddueller.SampleHost/LauncherPageRenderer.cs b/samples/Sheddueller.SampleHost/LauncherPageRenderer.cs index e054274..b685af7 100644 --- a/samples/Sheddueller.SampleHost/LauncherPageRenderer.cs +++ b/samples/Sheddueller.SampleHost/LauncherPageRenderer.cs @@ -67,7 +67,7 @@ public static string Render(string? statusMessage) builder.AppendLine(" "); builder.AppendLine("
"); AppendActionCard(builder, "/launch/quick-success", "Quick success", "Immediate completion for happy-path rows.", "Enqueue job"); - AppendActionCard(builder, "/launch/progress", "Progress + logs", "Emits durable logs and progress snapshots over a few seconds.", "Enqueue job"); + AppendActionCard(builder, "/launch/progress", "Progress + logs", "Emits captured ILogger logs and progress snapshots over a few seconds.", "Enqueue job"); AppendActionCard(builder, "/launch/retry-then-succeed", "Retry then succeed", "Fails twice, then succeeds so retry history is visible.", "Enqueue job"); AppendActionCard(builder, "/launch/permanent-failure", "Permanent failure", "Fails terminally without retries.", "Enqueue job"); AppendActionCard(builder, "/launch/delayed", "Delayed job", "Queues a short delayed job to exercise delayed state and not-before time.", "Enqueue job"); diff --git a/samples/Sheddueller.SampleHost/Program.cs b/samples/Sheddueller.SampleHost/Program.cs index 4645617..7fa5137 100644 --- a/samples/Sheddueller.SampleHost/Program.cs +++ b/samples/Sheddueller.SampleHost/Program.cs @@ -62,7 +62,7 @@ app.MapPost("/launch/progress", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) => { var jobId = await enqueuer.EnqueueAsync( - (service, ct, progress) => service.RunProgressAsync("progress-demo", Job.Context, progress, ct), + (service, ct, progress) => service.RunProgressAsync("progress-demo", progress, ct), cancellationToken: cancellationToken).ConfigureAwait(false); return RedirectWithMessage($"Queued progress demo job {jobId:D}."); }); @@ -71,7 +71,7 @@ { var runKey = $"retry-demo:{Guid.NewGuid():N}"; var jobId = await enqueuer.EnqueueAsync( - (service, ct, progress) => service.RunRetryUntilSuccessAsync(runKey, 2, Job.Context, progress, ct), + (service, ct, progress) => service.RunRetryUntilSuccessAsync(runKey, 2, progress, ct), new JobSubmission(RetryPolicy: new RetryPolicy(4, RetryBackoffKind.Fixed, TimeSpan.FromSeconds(2))), cancellationToken).ConfigureAwait(false); return RedirectWithMessage($"Queued retry demo job {jobId:D}. It will fail twice before succeeding."); @@ -80,7 +80,7 @@ app.MapPost("/launch/permanent-failure", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) => { var jobId = await enqueuer.EnqueueAsync( - (service, ct) => service.RunAlwaysFailAsync("permanent-failure", Job.Context, ct), + (service, ct) => service.RunAlwaysFailAsync("permanent-failure", ct), new JobSubmission(RetryPolicy: new RetryPolicy(1, RetryBackoffKind.Fixed, TimeSpan.FromSeconds(1))), cancellationToken).ConfigureAwait(false); return RedirectWithMessage($"Queued permanent failure job {jobId:D}."); @@ -99,7 +99,7 @@ app.MapPost("/launch/many-tags", async (IJobEnqueuer enqueuer, CancellationToken cancellationToken) => { var jobId = await enqueuer.EnqueueAsync( - (service, ct, progress) => service.RunProgressAsync("many-tags-demo", Job.Context, progress, ct), + (service, ct, progress) => service.RunProgressAsync("many-tags-demo", progress, ct), new JobSubmission( Priority: 15, Tags: @@ -129,7 +129,7 @@ { var label = $"blocking-{index}"; var jobId = await enqueuer.EnqueueAsync( - (service, ct, progress) => service.RunGroupHoldAsync(label, Job.Context, progress, ct), + (service, ct, progress) => service.RunGroupHoldAsync(label, progress, ct), new JobSubmission(Priority: 25, ConcurrencyGroupKeys: [GroupKey]), cancellationToken).ConfigureAwait(false); jobIds.Add(jobId); @@ -152,7 +152,7 @@ if (!await HasNonTerminalJobsInGroupAsync(inspectionReader, GroupKey, cancellationToken).ConfigureAwait(false)) { _ = await enqueuer.EnqueueAsync( - (service, ct, progress) => service.RunIdempotentDemoAsync("idempotent-demo-slot-holder", Job.Context, progress, ct), + (service, ct, progress) => service.RunIdempotentDemoAsync("idempotent-demo-slot-holder", progress, ct), new JobSubmission( Priority: 50, ConcurrencyGroupKeys: [GroupKey], @@ -161,7 +161,7 @@ } var jobId = await enqueuer.EnqueueAsync( - (service, ct, progress) => service.RunIdempotentDemoAsync(WorkLabel, Job.Context, progress, ct), + (service, ct, progress) => service.RunIdempotentDemoAsync(WorkLabel, progress, ct), new JobSubmission( Priority: 25, ConcurrencyGroupKeys: [GroupKey], @@ -177,7 +177,7 @@ var result = await scheduleManager.CreateOrUpdateAsync( "demo:recurring", "* * * * *", - (service, ct, progress) => service.RunRecurringAsync(Job.Context, progress, ct), + (service, ct, progress) => service.RunRecurringAsync(progress, ct), new RecurringScheduleOptions(Priority: 10, OverlapMode: RecurringOverlapMode.Skip), cancellationToken).ConfigureAwait(false); return RedirectWithMessage($"Recurring schedule 'demo:recurring' is {result}. The next occurrence will fire on the next minute boundary."); diff --git a/samples/Sheddueller.SampleHost/README.md b/samples/Sheddueller.SampleHost/README.md index 27a8807..57b6af9 100644 --- a/samples/Sheddueller.SampleHost/README.md +++ b/samples/Sheddueller.SampleHost/README.md @@ -47,7 +47,7 @@ The sample applies PostgreSQL schema migrations automatically on startup and reg ## Launcher Scenarios - `Quick success`: immediate completion -- `Progress + logs`: emits durable logs and `IProgress` progress updates +- `Progress + logs`: emits `ILogger` messages captured as durable job logs, plus `IProgress` progress updates - `Retry then succeed`: fails twice, then succeeds - `Permanent failure`: terminal failure without retries - `Delayed job`: waits 30 seconds before becoming claimable diff --git a/src/Sheddueller.Worker/Internal/CapturedJobLogContext.cs b/src/Sheddueller.Worker/Internal/CapturedJobLogContext.cs new file mode 100644 index 0000000..73ef54b --- /dev/null +++ b/src/Sheddueller.Worker/Internal/CapturedJobLogContext.cs @@ -0,0 +1,6 @@ +namespace Sheddueller.Worker.Internal; + +internal sealed record CapturedJobLogContext( + Guid ExecutionId, + Guid JobId, + int AttemptNumber); diff --git a/src/Sheddueller.Worker/Internal/JobLogCaptureContext.cs b/src/Sheddueller.Worker/Internal/JobLogCaptureContext.cs new file mode 100644 index 0000000..694d2b4 --- /dev/null +++ b/src/Sheddueller.Worker/Internal/JobLogCaptureContext.cs @@ -0,0 +1,50 @@ +namespace Sheddueller.Worker.Internal; + +using System.Collections.Concurrent; + +internal static class JobLogCaptureContext +{ + private static readonly AsyncLocal Current = new(); + private static readonly ConcurrentDictionary ActiveExecutions = new(); + + public static CapturedJobLogContext? Active + { + get + { + var current = Current.Value; + return current is not null && ActiveExecutions.ContainsKey(current.ExecutionId) + ? current + : null; + } + } + + public static IDisposable Begin( + Guid jobId, + int attemptNumber) + { + var previous = Current.Value; + var current = new CapturedJobLogContext(Guid.NewGuid(), jobId, attemptNumber); + ActiveExecutions.TryAdd(current.ExecutionId, 0); + Current.Value = current; + + return new Scope(current.ExecutionId, previous); + } + + private sealed class Scope( + Guid executionId, + CapturedJobLogContext? previous) : IDisposable + { + private int _disposed; + + public void Dispose() + { + if (Interlocked.Exchange(ref this._disposed, 1) != 0) + { + return; + } + + ActiveExecutions.TryRemove(executionId, out _); + Current.Value = previous; + } + } +} diff --git a/src/Sheddueller.Worker/Internal/ShedduellerJobLogEventDispatcher.cs b/src/Sheddueller.Worker/Internal/ShedduellerJobLogEventDispatcher.cs new file mode 100644 index 0000000..3cf9554 --- /dev/null +++ b/src/Sheddueller.Worker/Internal/ShedduellerJobLogEventDispatcher.cs @@ -0,0 +1,81 @@ +namespace Sheddueller.Worker.Internal; + +using System.Diagnostics.CodeAnalysis; + +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +using Sheddueller.Storage; + +internal sealed class ShedduellerJobLogEventDispatcher( + ShedduellerJobLogEventQueue queue, + IJobEventSink eventSink, + ILogger logger) : BackgroundService +{ + private const int MaxBatchSize = 64; + private static readonly TimeSpan FlushInterval = TimeSpan.FromMilliseconds(250); + + private readonly ShedduellerJobLogEventQueue _queue = queue; + private readonly IJobEventSink _eventSink = eventSink; + private readonly ILogger _logger = logger; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var batch = new List(MaxBatchSize); + + try + { + while (await this._queue.Reader.WaitToReadAsync(stoppingToken).ConfigureAwait(false)) + { + this.DrainAvailable(batch); + if (batch.Count < MaxBatchSize) + { + await Task.Delay(FlushInterval, stoppingToken).ConfigureAwait(false); + this.DrainAvailable(batch); + } + + await this.FlushAsync(batch).ConfigureAwait(false); + } + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // Shutdown drains any queued log events below. + } + finally + { + this.DrainAvailable(batch, int.MaxValue); + await this.FlushAsync(batch).ConfigureAwait(false); + } + } + + private void DrainAvailable(List batch, int maxBatchSize = MaxBatchSize) + { + while (batch.Count < maxBatchSize && this._queue.Reader.TryRead(out var request)) + { + batch.Add(request); + } + } + + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Captured job logging is best-effort and must not fail the dispatcher.")] + private async ValueTask FlushAsync(List batch) + { + try + { + foreach (var request in batch) + { + try + { + await this._eventSink.AppendAsync(request).ConfigureAwait(false); + } + catch (Exception exception) + { + this._logger.JobEventAppendFailed(exception, request.JobId); + } + } + } + finally + { + batch.Clear(); + } + } +} diff --git a/src/Sheddueller.Worker/Internal/ShedduellerJobLogEventQueue.cs b/src/Sheddueller.Worker/Internal/ShedduellerJobLogEventQueue.cs new file mode 100644 index 0000000..5cc5d29 --- /dev/null +++ b/src/Sheddueller.Worker/Internal/ShedduellerJobLogEventQueue.cs @@ -0,0 +1,26 @@ +namespace Sheddueller.Worker.Internal; + +using System.Threading.Channels; + +using Sheddueller.Storage; + +internal sealed class ShedduellerJobLogEventQueue +{ + private readonly Channel _channel = Channel.CreateUnbounded( + new UnboundedChannelOptions + { + AllowSynchronousContinuations = false, + SingleReader = true, + SingleWriter = false, + }); + + public ChannelReader Reader + => this._channel.Reader; + + public bool TryEnqueue(AppendJobEventRequest request) + { + ArgumentNullException.ThrowIfNull(request); + + return this._channel.Writer.TryWrite(request); + } +} diff --git a/src/Sheddueller.Worker/Internal/ShedduellerJobLoggerProvider.cs b/src/Sheddueller.Worker/Internal/ShedduellerJobLoggerProvider.cs new file mode 100644 index 0000000..ba2cf9f --- /dev/null +++ b/src/Sheddueller.Worker/Internal/ShedduellerJobLoggerProvider.cs @@ -0,0 +1,150 @@ +namespace Sheddueller.Worker.Internal; + +using System.Diagnostics.CodeAnalysis; +using System.Globalization; + +using Microsoft.Extensions.Logging; + +using Sheddueller.Storage; + +internal sealed class ShedduellerJobLoggerProvider(ShedduellerJobLogEventQueue eventQueue) : ILoggerProvider +{ + public ILogger CreateLogger(string categoryName) + => new JobLogger(eventQueue, categoryName); + + public void Dispose() + { + } + + private sealed class JobLogger( + ShedduellerJobLogEventQueue eventQueue, + string categoryName) : ILogger + { + public IDisposable BeginScope(TState state) + where TState : notnull + => NullScope.Instance; + + public bool IsEnabled(LogLevel logLevel) + => logLevel != LogLevel.None && JobLogCaptureContext.Active is not null; + + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Captured job logging is best-effort and must not fail jobs.")] + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + if (logLevel == LogLevel.None) + { + return; + } + + var activeJob = JobLogCaptureContext.Active; + if (activeJob is null) + { + return; + } + + try + { + ArgumentNullException.ThrowIfNull(formatter); + + var message = formatter(state, exception); + if (string.IsNullOrEmpty(message) && exception is not null) + { + message = exception.Message; + } + + var request = new AppendJobEventRequest( + activeJob.JobId, + JobEventKind.Log, + activeJob.AttemptNumber, + ToJobLogLevel(logLevel), + message, + Fields: this.CreateFields(eventId, state, exception)); + + _ = eventQueue.TryEnqueue(request); + } + catch (Exception) + { + // Captured logs are durable telemetry, not part of the job contract. + } + } + + private Dictionary? CreateFields( + EventId eventId, + TState state, + Exception? exception) + { + Dictionary? fields = null; + + AddField(ref fields, "LoggerCategory", categoryName); + if (eventId.Id != 0) + { + AddField(ref fields, "EventId", eventId.Id.ToString(CultureInfo.InvariantCulture)); + } + + if (!string.IsNullOrEmpty(eventId.Name)) + { + AddField(ref fields, "EventName", eventId.Name); + } + + if (state is IEnumerable> values) + { + foreach (var (key, value) in values) + { + if (string.IsNullOrEmpty(key) || key == "{OriginalFormat}" || value is null) + { + continue; + } + + AddField(ref fields, key, FormatFieldValue(value)); + } + } + + if (exception is not null) + { + AddField(ref fields, "ExceptionType", exception.GetType().FullName ?? exception.GetType().Name); + AddField(ref fields, "ExceptionMessage", exception.Message); + } + + return fields; + } + + private static void AddField( + ref Dictionary? fields, + string key, + string value) + { + fields ??= new Dictionary(StringComparer.Ordinal); + fields[key] = value; + } + + private static string FormatFieldValue(object value) + => value is IFormattable formattable + ? formattable.ToString(format: null, CultureInfo.InvariantCulture) + : value.ToString() ?? string.Empty; + + private static JobLogLevel ToJobLogLevel(LogLevel logLevel) + => logLevel switch + { + LogLevel.Trace => JobLogLevel.Trace, + LogLevel.Debug => JobLogLevel.Debug, + LogLevel.Information => JobLogLevel.Information, + LogLevel.Warning => JobLogLevel.Warning, + LogLevel.Error => JobLogLevel.Error, + LogLevel.Critical => JobLogLevel.Critical, + _ => JobLogLevel.Information, + }; + } + + private sealed class NullScope : IDisposable + { + public static readonly NullScope Instance = new(); + + public void Dispose() + { + } + } +} diff --git a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs index c03127c..b38203c 100644 --- a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs +++ b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs @@ -175,11 +175,14 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken var serializableParameterTypes = methodParameterTypes .Where((_, index) => parameterBindings[index].Kind == JobMethodParameterBindingKind.Serialized) .ToArray(); - var jobContext = new JobContext(job.JobId, job.AttemptCount, this._jobEventSink, this._jobContextLogger, executionToken); + var jobContext = new JobContext(job.JobId, job.AttemptCount, executionToken); var scope = this._scopeFactory.CreateAsyncScope(); await using (scope.ConfigureAwait(false)) { + using var loggingScope = this._logger.BeginScope(CreateJobLoggingScope(job, this._nodeIdProvider.NodeId)); + using var logCapture = JobLogCaptureContext.Begin(job.JobId, job.AttemptCount); + var (target, bindingFlags) = job.InvocationTargetKind switch { JobInvocationTargetKind.Static => ( @@ -242,6 +245,16 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken } } + private static Dictionary CreateJobLoggingScope( + ClaimedJob job, + string nodeId) + => new(StringComparer.Ordinal) + { + ["ShedduellerJobId"] = job.JobId, + ["ShedduellerAttemptNumber"] = job.AttemptCount, + ["ShedduellerNodeId"] = nodeId, + }; + private static object?[] BuildInvocationArguments( IServiceProvider serviceProvider, Type[] methodParameterTypes, diff --git a/src/Sheddueller.Worker/ShedduellerWorkerServiceCollectionExtensions.cs b/src/Sheddueller.Worker/ShedduellerWorkerServiceCollectionExtensions.cs index 00316d8..01e922c 100644 --- a/src/Sheddueller.Worker/ShedduellerWorkerServiceCollectionExtensions.cs +++ b/src/Sheddueller.Worker/ShedduellerWorkerServiceCollectionExtensions.cs @@ -4,6 +4,7 @@ namespace Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Sheddueller; using Sheddueller.Runtime; @@ -28,6 +29,9 @@ public static IServiceCollection AddShedduellerWorker( services.AddSheddueller(configure); services.TryAddSingleton(); + services.TryAddSingleton(); + services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddEnumerable(ServiceDescriptor.Singleton()); services.TryAddEnumerable(ServiceDescriptor.Singleton()); TryAddStartupValidationHostedService(services); services.TryAddEnumerable(ServiceDescriptor.Singleton()); diff --git a/src/Sheddueller/IJobContext.cs b/src/Sheddueller/IJobContext.cs index c5121f5..d281aed 100644 --- a/src/Sheddueller/IJobContext.cs +++ b/src/Sheddueller/IJobContext.cs @@ -19,13 +19,4 @@ public interface IJobContext /// Gets the scheduler-owned execution cancellation token. /// CancellationToken CancellationToken { get; } - - /// - /// Writes a durable job log event. - /// - ValueTask LogAsync( - JobLogLevel level, - string message, - IReadOnlyDictionary? fields = null, - CancellationToken cancellationToken = default); } diff --git a/src/Sheddueller/Runtime/JobContext.cs b/src/Sheddueller/Runtime/JobContext.cs index 8ab6a06..0060038 100644 --- a/src/Sheddueller/Runtime/JobContext.cs +++ b/src/Sheddueller/Runtime/JobContext.cs @@ -1,16 +1,8 @@ namespace Sheddueller.Runtime; -using System.Diagnostics.CodeAnalysis; - -using Microsoft.Extensions.Logging; - -using Sheddueller.Storage; - internal sealed class JobContext( Guid jobId, int attemptNumber, - IJobEventSink eventSink, - ILogger logger, CancellationToken cancellationToken) : IJobContext { public Guid JobId { get; } = jobId; @@ -18,60 +10,4 @@ internal sealed class JobContext( public int AttemptNumber { get; } = attemptNumber; public CancellationToken CancellationToken { get; } = cancellationToken; - - public async ValueTask LogAsync( - JobLogLevel level, - string message, - IReadOnlyDictionary? fields = null, - CancellationToken cancellationToken = default) - { - ArgumentNullException.ThrowIfNull(message); - ValidateFields(fields); - - await this.AppendBestEffortAsync( - new AppendJobEventRequest(this.JobId, JobEventKind.Log, this.AttemptNumber, level, message, Fields: fields), - cancellationToken) - .ConfigureAwait(false); - } - - [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Job-context telemetry is best-effort by v4 design.")] - private async ValueTask AppendBestEffortAsync( - AppendJobEventRequest request, - CancellationToken cancellationToken) - { - try - { - await eventSink.AppendAsync(request, cancellationToken).ConfigureAwait(false); - } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) - { - throw; - } - catch (Exception exception) - { - logger.JobEventAppendFailed(exception, request.JobId); - // Best-effort telemetry must not fail the owning job. - } - } - - private static void ValidateFields(IReadOnlyDictionary? fields) - { - if (fields is null) - { - return; - } - - foreach (var (key, value) in fields) - { - if (key is null) - { - throw new ArgumentException("Job log field names cannot be null.", nameof(fields)); - } - - if (value is null) - { - throw new ArgumentException("Job log field values cannot be null.", nameof(fields)); - } - } - } } diff --git a/test/Sheddueller.Tests/JobContextLoggingTests.cs b/test/Sheddueller.Tests/JobContextLoggingTests.cs deleted file mode 100644 index 918df4f..0000000 --- a/test/Sheddueller.Tests/JobContextLoggingTests.cs +++ /dev/null @@ -1,44 +0,0 @@ -namespace Sheddueller.Tests; - -using Microsoft.Extensions.Logging; - -using Sheddueller.Runtime; -using Sheddueller.Storage; -using Sheddueller.Tests.Logging; - -using Shouldly; - -public sealed class JobContextLoggingTests -{ - [Fact] - public async Task Log_EventSinkFails_LogsDiagnosticAndDoesNotFailJob() - { - var jobId = Guid.NewGuid(); - using var logs = new TestLoggerProvider(); - using var loggerFactory = LoggerFactory.Create(builder => builder - .SetMinimumLevel(LogLevel.Trace) - .AddProvider(logs)); - var context = new JobContext( - jobId, - 1, - new FailingJobEventSink(), - loggerFactory.CreateLogger(), - CancellationToken.None); - - await context.LogAsync(JobLogLevel.Information, "hello"); - - var entry = logs.SingleByEventId(1040); - entry.Level.ShouldBe(LogLevel.Warning); - entry.Exception.ShouldBeOfType(); - entry.Properties["JobId"].ShouldBe(jobId); - entry.MessageTemplate.ShouldBe("Failed to append durable job event for job {JobId}."); - } - - private sealed class FailingJobEventSink : IJobEventSink - { - public ValueTask AppendAsync( - AppendJobEventRequest request, - CancellationToken cancellationToken = default) - => throw new InvalidOperationException("append failed"); - } -} diff --git a/test/Sheddueller.Tests/JobEnqueuerTests.cs b/test/Sheddueller.Tests/JobEnqueuerTests.cs index 0376fac..31d6106 100644 --- a/test/Sheddueller.Tests/JobEnqueuerTests.cs +++ b/test/Sheddueller.Tests/JobEnqueuerTests.cs @@ -605,12 +605,5 @@ private sealed class StubJobContext : IJobContext public int AttemptNumber => 0; public CancellationToken CancellationToken => CancellationToken.None; - - public ValueTask LogAsync( - JobLogLevel level, - string message, - IReadOnlyDictionary? fields = null, - CancellationToken cancellationToken = default) - => ValueTask.CompletedTask; } } diff --git a/test/Sheddueller.Worker.Tests/RegistrationTests.cs b/test/Sheddueller.Worker.Tests/RegistrationTests.cs index 1f36fac..7155f34 100644 --- a/test/Sheddueller.Worker.Tests/RegistrationTests.cs +++ b/test/Sheddueller.Worker.Tests/RegistrationTests.cs @@ -25,7 +25,8 @@ public void AddShedduellerWorker_RegistersClientAndWorkerServices() provider.GetRequiredService().ShouldNotBeNull(); provider.GetRequiredService().ShouldBeSameAs(provider.GetRequiredService()); provider.GetRequiredService().ShouldNotBeNull(); - provider.GetServices().Count().ShouldBe(2); + provider.GetServices().Count().ShouldBe(3); + provider.GetServices().ShouldContain(service => service.GetType() == typeof(ShedduellerJobLogEventDispatcher)); provider.GetServices().ShouldContain(service => service.GetType() == typeof(ShedduellerWorker)); } diff --git a/test/Sheddueller.Worker.Tests/WorkerJobLoggerTests.cs b/test/Sheddueller.Worker.Tests/WorkerJobLoggerTests.cs new file mode 100644 index 0000000..8d0bacb --- /dev/null +++ b/test/Sheddueller.Worker.Tests/WorkerJobLoggerTests.cs @@ -0,0 +1,561 @@ +namespace Sheddueller.Worker.Tests; + +using System.Collections.Concurrent; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +using Sheddueller.Serialization; +using Sheddueller.Storage; +using Sheddueller.Worker.Internal; + +using Shouldly; + +public sealed class WorkerJobLoggerTests +{ + [Fact] + public async Task JobExecution_LoggerWritesDuringJob_RecordsDurableLogEvent() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(LoggingJob.RunAsync)); + var store = new SingleClaimJobStore(job); + var eventSink = new RecordingJobEventSink(); + await using var provider = CreateProvider(store, eventSink); + var outsideLogger = provider.GetRequiredService>(); + OutsideLog(outsideLogger, null); + var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token); + + await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token); + + var request = eventSink.Requests.ShouldHaveSingleItem(); + request.JobId.ShouldBe(job.JobId); + request.Kind.ShouldBe(JobEventKind.Log); + request.AttemptNumber.ShouldBe(job.AttemptCount); + request.LogLevel.ShouldBe(JobLogLevel.Information); + request.Message.ShouldBe("Processed item 123."); + request.Fields.ShouldNotBeNull()["LoggerCategory"].ShouldBe(typeof(LoggingJob).FullName!.Replace('+', '.')); + request.Fields["EventId"].ShouldBe("42"); + request.Fields["EventName"].ShouldBe("ProcessedItem"); + request.Fields["ItemId"].ShouldBe("123"); + } + + [Fact] + public async Task JobExecution_LoggerWritesDuringJob_AddsJobMetadataScopeToExternalLoggers() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(LoggingJob.RunAsync)); + var store = new SingleClaimJobStore(job); + var eventSink = new RecordingJobEventSink(); + using var logs = new ScopeRecordingLoggerProvider(); + await using var provider = CreateProvider(store, eventSink, loggerProvider: logs); + var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token); + + await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token); + + var entry = logs.Entries.Single(log => log.EventId.Id == 42); + entry.ScopeProperties["ShedduellerJobId"].ShouldBe(job.JobId); + entry.ScopeProperties["ShedduellerAttemptNumber"].ShouldBe(job.AttemptCount); + entry.ScopeProperties["ShedduellerNodeId"].ShouldBe("worker-logger"); + } + + [Fact] + public async Task JobExecution_FireAndForgetLoggerAfterCompletion_DoesNotRecordDurableLogEvent() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(LateLoggingJob.RunAsync)); + var store = new SingleClaimJobStore(job); + var eventSink = new RecordingJobEventSink(); + var coordinator = new LateLogCoordinator(); + await using var provider = CreateProvider(store, eventSink, coordinator); + var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token); + + await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + coordinator.AllowLateLog.SetResult(); + await coordinator.LateLogWritten.Task.WaitAsync(cancellationTokenSource.Token); + await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token); + + eventSink.Requests.ShouldBeEmpty(); + } + + [Fact] + public async Task JobExecution_LoggerSinkFails_StillCompletesJob() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(LoggingJob.RunAsync)); + var store = new SingleClaimJobStore(job); + await using var provider = CreateProvider(store, new ThrowingJobEventSink()); + var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token); + + var completed = await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token); + + completed.JobId.ShouldBe(job.JobId); + } + + [Fact] + public async Task JobExecution_LoggerFormatterFails_StillCompletesJob() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(ThrowingFormatterJob.RunAsync)); + var store = new SingleClaimJobStore(job); + await using var provider = CreateProvider(store, new RecordingJobEventSink()); + var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token); + + var completed = await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token); + + completed.JobId.ShouldBe(job.JobId); + } + + [Fact] + public async Task JobExecution_LoggerSinkBlocks_StillCompletesJob() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(nameof(LoggingJob.RunAsync)); + var store = new SingleClaimJobStore(job); + var eventSink = new BlockingJobEventSink(); + await using var provider = CreateProvider(store, eventSink); + var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token); + + var completed = await store.Completed.Task.WaitAsync(cancellationTokenSource.Token); + eventSink.AllowAppend.SetResult(); + await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token); + + completed.JobId.ShouldBe(job.JobId); + } + + private static ServiceProvider CreateProvider( + SingleClaimJobStore store, + IJobEventSink eventSink, + LateLogCoordinator? coordinator = null, + ILoggerProvider? loggerProvider = null) + where TJob : class + { + var services = new ServiceCollection(); + services.AddLogging(builder => + { + builder.SetMinimumLevel(LogLevel.Trace); + if (loggerProvider is not null) + { + builder.AddProvider(loggerProvider); + } + }); + services.AddSingleton(eventSink); + services.AddSingleton(store); + services.AddSingleton(serviceProvider => serviceProvider.GetRequiredService()); + if (coordinator is not null) + { + services.AddSingleton(coordinator); + } + + services.AddTransient(); + services.AddShedduellerWorker(builder => builder.ConfigureOptions(options => + { + options.NodeId = "worker-logger"; + options.IdlePollingInterval = TimeSpan.FromMilliseconds(10); + options.HeartbeatInterval = TimeSpan.FromSeconds(5); + options.LeaseDuration = TimeSpan.FromSeconds(30); + })); + + return services.BuildServiceProvider(); + } + + private static async Task> StartHostedServicesAsync( + ServiceProvider provider, + CancellationToken cancellationToken) + { + var hostedServices = provider.GetServices().ToArray(); + foreach (var hostedService in hostedServices) + { + await hostedService.StartAsync(cancellationToken); + } + + return hostedServices; + } + + private static async Task StopHostedServicesAsync( + IEnumerable hostedServices, + CancellationToken cancellationToken) + { + foreach (var hostedService in hostedServices.Reverse()) + { + await hostedService.StopAsync(cancellationToken); + } + } + + private static ClaimedJob CreateClaimedJob(string methodName) + => new( + Guid.NewGuid(), + EnqueueSequence: 1, + Priority: 0, + ServiceType: typeof(TJob).AssemblyQualifiedName!, + MethodName: methodName, + MethodParameterTypes: [typeof(CancellationToken).AssemblyQualifiedName!], + SerializedArguments: new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), + ConcurrencyGroupKeys: [], + AttemptCount: 1, + MaxAttempts: 1, + LeaseToken: Guid.NewGuid(), + LeaseExpiresAtUtc: DateTimeOffset.UtcNow.AddSeconds(30), + RetryBackoffKind: null, + RetryBaseDelay: null, + RetryMaxDelay: null, + SourceScheduleKey: null, + ScheduledFireAtUtc: null, + MethodParameterBindings: [new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken)]); + + private static readonly Action OutsideLog = + LoggerMessage.Define( + LogLevel.Information, + new EventId(41, nameof(OutsideLog)), + "outside log"); + + private static readonly Action ProcessedItem = + LoggerMessage.Define( + LogLevel.Information, + new EventId(42, nameof(ProcessedItem)), + "Processed item {ItemId}."); + + private static readonly Action LateLog = + LoggerMessage.Define( + LogLevel.Information, + new EventId(43, nameof(LateLog)), + "late log"); + + private sealed class LoggingJob(ILogger logger) + { + public Task RunAsync(CancellationToken cancellationToken) + { + ProcessedItem(logger, 123, null); + return Task.CompletedTask; + } + } + + private sealed class LateLoggingJob( + ILogger logger, + LateLogCoordinator coordinator) + { + public Task RunAsync(CancellationToken cancellationToken) + { + _ = Task.Run(async () => + { + await coordinator.AllowLateLog.Task.ConfigureAwait(false); + LateLog(logger, null); + coordinator.LateLogWritten.SetResult(); + }, CancellationToken.None); + + return Task.CompletedTask; + } + } + + private sealed class ThrowingFormatterJob(ILogger logger) + { + public Task RunAsync(CancellationToken cancellationToken) + { + logger.Log( + LogLevel.Information, + new EventId(44, nameof(ThrowingFormatterJob)), + state: null, + exception: null, + formatter: static (_, _) => throw new InvalidOperationException("formatter failed")); + + return Task.CompletedTask; + } + } + + private sealed class LateLogCoordinator + { + public TaskCompletionSource AllowLateLog { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public TaskCompletionSource LateLogWritten { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + } + + private sealed class RecordingJobEventSink : IJobEventSink + { + private readonly Lock _syncRoot = new(); + private readonly List _requests = []; + + public IReadOnlyList Requests + { + get + { + lock (this._syncRoot) + { + return Array.AsReadOnly([.. this._requests]); + } + } + } + + public ValueTask AppendAsync( + AppendJobEventRequest request, + CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); + + lock (this._syncRoot) + { + this._requests.Add(request); + return ValueTask.FromResult(new JobEvent( + Guid.NewGuid(), + request.JobId, + this._requests.Count, + request.Kind, + DateTimeOffset.UtcNow, + request.AttemptNumber, + request.LogLevel, + request.Message, + request.ProgressPercent, + request.Fields)); + } + } + } + + private sealed class ThrowingJobEventSink : IJobEventSink + { + public ValueTask AppendAsync( + AppendJobEventRequest request, + CancellationToken cancellationToken = default) + => throw new InvalidOperationException("append failed"); + } + + private sealed class BlockingJobEventSink : IJobEventSink + { + public TaskCompletionSource AllowAppend { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public async ValueTask AppendAsync( + AppendJobEventRequest request, + CancellationToken cancellationToken = default) + { + await this.AllowAppend.Task.WaitAsync(cancellationToken).ConfigureAwait(false); + + return new JobEvent( + Guid.NewGuid(), + request.JobId, + EventSequence: 1, + request.Kind, + DateTimeOffset.UtcNow, + request.AttemptNumber, + request.LogLevel, + request.Message, + request.ProgressPercent, + request.Fields); + } + } + + private sealed class ScopeRecordingLoggerProvider : ILoggerProvider, ISupportExternalScope + { + private readonly ConcurrentQueue _entries = new(); + private IExternalScopeProvider _scopeProvider = new LoggerExternalScopeProvider(); + + public IReadOnlyList Entries + => [.. this._entries]; + + public ILogger CreateLogger(string categoryName) + => new ScopeRecordingLogger(categoryName, this); + + public void Dispose() + { + } + + public void SetScopeProvider(IExternalScopeProvider scopeProvider) + => this._scopeProvider = scopeProvider; + + private void Add( + string categoryName, + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + this._entries.Enqueue(new ScopeLogEntry( + categoryName, + logLevel, + eventId, + formatter(state, exception), + ReadScopeProperties(this._scopeProvider))); + } + + private static Dictionary ReadScopeProperties(IExternalScopeProvider scopeProvider) + { + var properties = new Dictionary(StringComparer.Ordinal); + scopeProvider.ForEachScope(static (scope, state) => + { + if (scope is not IEnumerable> pairs) + { + return; + } + + foreach (var (key, value) in pairs) + { + state[key] = value; + } + }, properties); + + return properties; + } + + private sealed class ScopeRecordingLogger( + string categoryName, + ScopeRecordingLoggerProvider provider) : ILogger + { + public IDisposable BeginScope(TState state) + where TState : notnull + => NullScope.Instance; + + public bool IsEnabled(LogLevel logLevel) + => true; + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + => provider.Add(categoryName, logLevel, eventId, state, exception, formatter); + } + + private sealed class NullScope : IDisposable + { + public static readonly NullScope Instance = new(); + + public void Dispose() + { + } + } + } + + private sealed record ScopeLogEntry( + string CategoryName, + LogLevel Level, + EventId EventId, + string Message, + IReadOnlyDictionary ScopeProperties); + + private sealed class SingleClaimJobStore(ClaimedJob job) : IJobStore + { + private int _claimed; + + public TaskCompletionSource Completed { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public ValueTask EnqueueAsync( + EnqueueJobRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> EnqueueManyAsync( + IReadOnlyList requests, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TryClaimNextAsync( + ClaimJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult( + Interlocked.Exchange(ref this._claimed, 1) == 0 + ? new ClaimJobResult.Claimed(job) + : new ClaimJobResult.NoJobAvailable()); + + public ValueTask MarkCompletedAsync( + CompleteJobRequest request, + CancellationToken cancellationToken = default) + { + this.Completed.TrySetResult(request); + return ValueTask.FromResult(true); + } + + public ValueTask MarkFailedAsync( + FailJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RenewLeaseAsync( + RenewLeaseRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask ReleaseJobAsync( + ReleaseJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecoverExpiredLeasesAsync( + RecoverExpiredLeasesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + + public ValueTask CancelAsync( + CancelJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(JobCancellationResult.NotFound); + + public ValueTask GetCancellationRequestedAtAsync( + JobCancellationStatusRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(null); + + public ValueTask MarkCancellationObservedAsync( + ObserveJobCancellationRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecordWorkerNodeHeartbeatAsync( + WorkerNodeHeartbeatRequest request, + CancellationToken cancellationToken = default) + => ValueTask.CompletedTask; + + public ValueTask SetConcurrencyLimitAsync( + SetConcurrencyLimitRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetConfiguredConcurrencyLimitAsync( + string groupKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask CreateOrUpdateRecurringScheduleAsync( + UpsertRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TriggerRecurringScheduleAsync( + TriggerRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask DeleteRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask PauseRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset pausedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask ResumeRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset resumedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> ListRecurringSchedulesAsync( + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask MaterializeDueRecurringSchedulesAsync( + MaterializeDueRecurringSchedulesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + } +}