Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ builder.Host.UseSerilog(

Because Sheddueller's capture is a Microsoft `ILoggerProvider`, Microsoft logging filters still apply. Configure `Logging`/`ILoggingBuilder` filters alongside any Serilog filtering rules if you want to control which job logs are stored by Sheddueller.

Set `ShedduellerOptions.EnableJobLogCapture = true` to enable durable capture of `ILogger<T>` job logs. This is disabled by default. The worker still opens the job logging scope, so external providers can continue receiving `ShedduellerJobId`, `ShedduellerAttemptNumber`, and `ShedduellerNodeId`.

Use `NotBeforeUtc` for delayed jobs. Use `JobIdempotencyKind.MethodAndArguments` to reuse an existing queued job with the same target method and serialized arguments.

## Recurring Schedules
Expand Down
14 changes: 10 additions & 4 deletions src/Sheddueller.Worker/Internal/ShedduellerJobLoggerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,34 @@ namespace Sheddueller.Worker.Internal;
using System.Globalization;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using Sheddueller;
using Sheddueller.Storage;

internal sealed class ShedduellerJobLoggerProvider(ShedduellerJobLogEventQueue eventQueue) : ILoggerProvider
internal sealed class ShedduellerJobLoggerProvider(
ShedduellerJobLogEventQueue eventQueue,
IOptions<ShedduellerOptions> options) : ILoggerProvider
{
public ILogger CreateLogger(string categoryName)
=> new JobLogger(eventQueue, categoryName);
=> new JobLogger(eventQueue, options.Value.EnableJobLogCapture, categoryName);

public void Dispose()
{
}

private sealed class JobLogger(
ShedduellerJobLogEventQueue eventQueue,
bool enableJobLogCapture,
string categoryName) : ILogger
{
public IDisposable BeginScope<TState>(TState state)
where TState : notnull
=> NullScope.Instance;

public bool IsEnabled(LogLevel logLevel)
=> logLevel != LogLevel.None && JobLogCaptureContext.Active is not null;
=> logLevel != LogLevel.None
&& enableJobLogCapture;

[SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Captured job logging is best-effort and must not fail jobs.")]
public void Log<TState>(
Expand All @@ -35,7 +41,7 @@ public void Log<TState>(
Exception? exception,
Func<TState, Exception?, string> formatter)
{
if (logLevel == LogLevel.None)
if (logLevel == LogLevel.None || !enableJobLogCapture)
{
return;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Sheddueller/ShedduellerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public sealed class ShedduellerOptions
/// </summary>
public RetryPolicy? DefaultRetryPolicy { get; set; }

/// <summary>
/// Gets or sets whether Microsoft ILogger messages written during job execution are captured as durable job log events.
/// </summary>
public bool EnableJobLogCapture { get; set; }

/// <summary>
/// Gets the effective stale worker node threshold.
/// </summary>
Expand Down
59 changes: 52 additions & 7 deletions test/Sheddueller.Worker.Tests/WorkerJobLoggerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ public async Task JobExecution_LoggerWritesDuringJob_RecordsDurableLogEvent()
var job = CreateClaimedJob<LoggingJob>(nameof(LoggingJob.RunAsync));
var store = new SingleClaimJobStore(job);
var eventSink = new RecordingJobEventSink();
await using var provider = CreateProvider<LoggingJob>(store, eventSink);
await using var provider = CreateProvider<LoggingJob>(
store,
eventSink,
configureOptions: options => options.EnableJobLogCapture = true);
var outsideLogger = provider.GetRequiredService<ILogger<LoggingJob>>();
OutsideLog(outsideLogger, null);
var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token);

await store.Completed.Task.WaitAsync(cancellationTokenSource.Token);
await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token);

var request = eventSink.Requests.ShouldHaveSingleItem();
eventSink.Requests.Any(request => HasEventId(request, "41")).ShouldBeFalse();
var request = eventSink.Requests.Single(request => HasEventId(request, "42"));
request.JobId.ShouldBe(job.JobId);
request.Kind.ShouldBe(JobEventKind.Log);
request.AttemptNumber.ShouldBe(job.AttemptCount);
Expand Down Expand Up @@ -61,6 +65,27 @@ public async Task JobExecution_LoggerWritesDuringJob_AddsJobMetadataScopeToExter
entry.ScopeProperties["ShedduellerNodeId"].ShouldBe("worker-logger");
}

[Fact]
public async Task JobExecution_LogCaptureDisabledByDefault_DoesNotRecordDurableLogEventButKeepsExternalScope()
{
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var job = CreateClaimedJob<LoggingJob>(nameof(LoggingJob.RunAsync));
var store = new SingleClaimJobStore(job);
var eventSink = new RecordingJobEventSink();
using var logs = new ScopeRecordingLoggerProvider();
await using var provider = CreateProvider<LoggingJob>(store, eventSink, loggerProvider: logs);
var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token);

await store.Completed.Task.WaitAsync(cancellationTokenSource.Token);
await StopHostedServicesAsync(hostedServices, cancellationTokenSource.Token);

eventSink.Requests.ShouldBeEmpty();
var entry = logs.Entries.Single(log => log.EventId.Id == 42);
entry.ScopeProperties["ShedduellerJobId"].ShouldBe(job.JobId);
entry.ScopeProperties["ShedduellerAttemptNumber"].ShouldBe(job.AttemptCount);
entry.ScopeProperties["ShedduellerNodeId"].ShouldBe("worker-logger");
}

[Fact]
public async Task JobExecution_FireAndForgetLoggerAfterCompletion_DoesNotRecordDurableLogEvent()
{
Expand All @@ -69,7 +94,11 @@ public async Task JobExecution_FireAndForgetLoggerAfterCompletion_DoesNotRecordD
var store = new SingleClaimJobStore(job);
var eventSink = new RecordingJobEventSink();
var coordinator = new LateLogCoordinator();
await using var provider = CreateProvider<LateLoggingJob>(store, eventSink, coordinator);
await using var provider = CreateProvider<LateLoggingJob>(
store,
eventSink,
coordinator,
configureOptions: options => options.EnableJobLogCapture = true);
var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token);

await store.Completed.Task.WaitAsync(cancellationTokenSource.Token);
Expand All @@ -86,7 +115,10 @@ public async Task JobExecution_LoggerSinkFails_StillCompletesJob()
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var job = CreateClaimedJob<LoggingJob>(nameof(LoggingJob.RunAsync));
var store = new SingleClaimJobStore(job);
await using var provider = CreateProvider<LoggingJob>(store, new ThrowingJobEventSink());
await using var provider = CreateProvider<LoggingJob>(
store,
new ThrowingJobEventSink(),
configureOptions: options => options.EnableJobLogCapture = true);
var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token);

var completed = await store.Completed.Task.WaitAsync(cancellationTokenSource.Token);
Expand All @@ -101,7 +133,10 @@ public async Task JobExecution_LoggerFormatterFails_StillCompletesJob()
using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var job = CreateClaimedJob<ThrowingFormatterJob>(nameof(ThrowingFormatterJob.RunAsync));
var store = new SingleClaimJobStore(job);
await using var provider = CreateProvider<ThrowingFormatterJob>(store, new RecordingJobEventSink());
await using var provider = CreateProvider<ThrowingFormatterJob>(
store,
new RecordingJobEventSink(),
configureOptions: options => options.EnableJobLogCapture = true);
var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token);

var completed = await store.Completed.Task.WaitAsync(cancellationTokenSource.Token);
Expand All @@ -117,7 +152,10 @@ public async Task JobExecution_LoggerSinkBlocks_StillCompletesJob()
var job = CreateClaimedJob<LoggingJob>(nameof(LoggingJob.RunAsync));
var store = new SingleClaimJobStore(job);
var eventSink = new BlockingJobEventSink();
await using var provider = CreateProvider<LoggingJob>(store, eventSink);
await using var provider = CreateProvider<LoggingJob>(
store,
eventSink,
configureOptions: options => options.EnableJobLogCapture = true);
var hostedServices = await StartHostedServicesAsync(provider, cancellationTokenSource.Token);

var completed = await store.Completed.Task.WaitAsync(cancellationTokenSource.Token);
Expand All @@ -131,7 +169,8 @@ private static ServiceProvider CreateProvider<TJob>(
SingleClaimJobStore store,
IJobEventSink eventSink,
LateLogCoordinator? coordinator = null,
ILoggerProvider? loggerProvider = null)
ILoggerProvider? loggerProvider = null,
Action<ShedduellerOptions>? configureOptions = null)
where TJob : class
{
var services = new ServiceCollection();
Expand All @@ -158,11 +197,17 @@ private static ServiceProvider CreateProvider<TJob>(
options.IdlePollingInterval = TimeSpan.FromMilliseconds(10);
options.HeartbeatInterval = TimeSpan.FromSeconds(5);
options.LeaseDuration = TimeSpan.FromSeconds(30);
configureOptions?.Invoke(options);
}));

return services.BuildServiceProvider();
}

private static bool HasEventId(AppendJobEventRequest request, string eventId)
=> request.Fields is not null
&& request.Fields.TryGetValue("EventId", out var actualEventId)
&& string.Equals(actualEventId, eventId, StringComparison.Ordinal);

private static async Task<IReadOnlyList<IHostedService>> StartHostedServicesAsync(
ServiceProvider provider,
CancellationToken cancellationToken)
Expand Down