From edc4d5fdf23b95e4be9727ad76ddec6ef7dd6074 Mon Sep 17 00:00:00 2001 From: Brian Tyler Date: Sun, 26 Apr 2026 14:32:25 +0100 Subject: [PATCH 1/6] feat(logging): integrate Microsoft.Extensions.Logging for enhanced diagnostics - Added logging capabilities across various components including JobEnqueuer, ConcurrencyGroupManager, JobContext, JobManager, and RecurringScheduleManager. - Introduced structured logging messages for job enqueueing, cancellation, and recurring schedule management. - Implemented logger message patterns using source-generated LoggerMessage extension methods. - Created a TestLoggerProvider for testing logging behavior in unit tests. - Added logging tests to ensure correct logging behavior in various scenarios. - Updated Sheddueller project dependencies to include Microsoft.Extensions.Logging. - Documented logging practices and guidelines in a new logging.md file. --- Directory.Packages.props | 1 + docs/logging.md | 56 +++++ .../DashboardJobEventListenerService.cs | 5 +- .../Internal/JobEventRetentionService.cs | 28 ++- .../ShedduellerDashboardLoggerMessages.cs | 47 ++++ .../Internal/SignalRJobEventNotifier.cs | 22 +- .../Internal/PostgresJobEventListener.cs | 39 +++- .../Internal/PostgresWakeSignal.cs | 14 +- .../ShedduellerPostgresLoggerMessages.cs | 69 ++++++ .../Internal/ShedduellerWorker.cs | 46 +++- .../ShedduellerWorkerLoggerMessages.cs | 123 +++++++++++ src/Sheddueller/Enqueueing/JobEnqueuer.cs | 14 +- .../Logging/ShedduellerLoggerMessages.cs | 105 +++++++++ .../Runtime/ConcurrencyGroupManager.cs | 9 +- src/Sheddueller/Runtime/JobContext.cs | 6 +- src/Sheddueller/Runtime/JobManager.cs | 15 +- .../Runtime/RecurringScheduleManager.cs | 28 ++- src/Sheddueller/Sheddueller.csproj | 1 + .../ShedduellerServiceCollectionExtensions.cs | 1 + .../JobEventRetentionServiceLoggingTests.cs | 55 +++++ .../Sheddueller.Dashboard.Tests.csproj | 4 + .../PostgresJobEventListenerLoggingTests.cs | 44 ++++ .../Sheddueller.Postgres.Tests.csproj | 1 + .../JobContextLoggingTests.cs | 44 ++++ .../Logging/TestLoggerProvider.cs | 94 ++++++++ .../RecurringScheduleManagerTests.cs | 4 +- test/Sheddueller.Tests/RegistrationTests.cs | 2 + .../Sheddueller.Worker.Tests.csproj | 4 + .../WorkerLoggingTests.cs | 206 ++++++++++++++++++ 29 files changed, 1043 insertions(+), 44 deletions(-) create mode 100644 docs/logging.md create mode 100644 src/Sheddueller.Dashboard/Internal/ShedduellerDashboardLoggerMessages.cs create mode 100644 src/Sheddueller.Postgres/Internal/ShedduellerPostgresLoggerMessages.cs create mode 100644 src/Sheddueller.Worker/Internal/ShedduellerWorkerLoggerMessages.cs create mode 100644 src/Sheddueller/Logging/ShedduellerLoggerMessages.cs create mode 100644 test/Sheddueller.Dashboard.Tests/JobEventRetentionServiceLoggingTests.cs create mode 100644 test/Sheddueller.Postgres.Tests/PostgresJobEventListenerLoggingTests.cs create mode 100644 test/Sheddueller.Tests/JobContextLoggingTests.cs create mode 100644 test/Sheddueller.Tests/Logging/TestLoggerProvider.cs create mode 100644 test/Sheddueller.Worker.Tests/WorkerLoggingTests.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index 2a77d19..4526bbb 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -8,6 +8,7 @@ + diff --git a/docs/logging.md b/docs/logging.md new file mode 100644 index 0000000..f113a20 --- /dev/null +++ b/docs/logging.md @@ -0,0 +1,56 @@ +# Logging + +Sheddueller uses `Microsoft.Extensions.Logging` for runtime diagnostics. Logs describe library behavior; durable job logs written through `IJobContext.LogAsync` remain separate job-history events for dashboard inspection. + +## LoggerMessage Pattern + +Use source-generated `LoggerMessage` extension methods for all Sheddueller diagnostic logs. + +```csharp +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerWorkerLoggerMessages +{ + private const int EventIdStart = 1100; + + [LoggerMessage( + EventIdStart + 20, + LogLevel.Debug, + "Completed job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobCompleted( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); +} +``` + +Keep structured parameters primitive or simple primitive collections. Do not log serialized payloads, method argument values, connection strings, arbitrary object graphs, or Serilog-specific destructuring syntax. + +Use `ElapsedMs` with `long` and `{ElapsedMs:D} ms` when logging timings. + +## EventId Ranges + +Assign EventIds by assembly and group related events within each range. + +| Assembly | Range | Current groups | +| --- | ---: | --- | +| `Sheddueller` | `1000-1099` | enqueue, cancellation, recurring schedules, concurrency groups, durable event append diagnostics | +| `Sheddueller.Worker` | `1100-1199` | worker lifecycle, claim loop, execution outcomes, leases, cancellation, recovery/materialization | +| `Sheddueller.Postgres` | `1200-1299` | wake listener, job-event listener, notification publishing | +| `Sheddueller.Dashboard` | `1300-1399` | listener dispatch, live update publishing, event retention cleanup | +| Future packages/providers | `1400-1499` | reserved | + +## Testing + +Prefer EventId-focused tests for diagnostic behavior. Capture and assert the EventId, level, message template, exception, and structured properties instead of relying only on rendered message text. + +Use the repository-level verification commands after adding or changing logging: + +```bash +dotnet clean +dotnet build Sheddueller.slnx --configuration Debug +dotnet build Sheddueller.slnx --configuration Release +dotnet test --solution Sheddueller.slnx --configuration Release +dotnet format Sheddueller.slnx --verify-no-changes --verbosity minimal +``` diff --git a/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs b/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs index dd71a12..5c55157 100644 --- a/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs +++ b/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs @@ -1,15 +1,18 @@ namespace Sheddueller.Dashboard.Internal; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Sheddueller.Runtime; internal sealed class DashboardJobEventListenerService( - IEnumerable listeners) : BackgroundService + IEnumerable listeners, + ILogger logger) : BackgroundService { protected override Task ExecuteAsync(CancellationToken stoppingToken) { var snapshot = listeners.ToArray(); + logger.DashboardJobEventListenerServiceStarted(snapshot.Length); return snapshot.Length == 0 ? Task.CompletedTask : Task.WhenAll(snapshot.Select(listener => listener.ListenAsync(stoppingToken))); diff --git a/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs b/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs index 9077bac..811528a 100644 --- a/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs +++ b/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs @@ -1,7 +1,10 @@ namespace Sheddueller.Dashboard.Internal; +using System.Diagnostics.CodeAnalysis; + using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Dashboard; @@ -9,15 +12,29 @@ namespace Sheddueller.Dashboard.Internal; internal sealed class JobEventRetentionService( IServiceProvider serviceProvider, - IOptions options) : BackgroundService + IOptions options, + ILogger logger) : BackgroundService { + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Retention cleanup failures are diagnostic and should not stop the dashboard host.")] protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { while (!stoppingToken.IsCancellationRequested) { - await this.CleanupOnceAsync(stoppingToken).ConfigureAwait(false); + try + { + await this.CleanupOnceAsync(stoppingToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + throw; + } + catch (Exception exception) + { + logger.DashboardEventRetentionCleanupFailed(exception); + } + await Task.Delay(TimeSpan.FromHours(1), stoppingToken).ConfigureAwait(false); } } @@ -31,9 +48,14 @@ private async ValueTask CleanupOnceAsync(CancellationToken cancellationToken) var store = serviceProvider.GetService(); if (store is null) { + logger.DashboardEventRetentionStoreMissing(); return; } - await store.CleanupAsync(options.Value.EventRetention, cancellationToken).ConfigureAwait(false); + var deleted = await store.CleanupAsync(options.Value.EventRetention, cancellationToken).ConfigureAwait(false); + if (deleted > 0) + { + logger.DashboardEventRetentionCleaned(deleted); + } } } diff --git a/src/Sheddueller.Dashboard/Internal/ShedduellerDashboardLoggerMessages.cs b/src/Sheddueller.Dashboard/Internal/ShedduellerDashboardLoggerMessages.cs new file mode 100644 index 0000000..4b2718b --- /dev/null +++ b/src/Sheddueller.Dashboard/Internal/ShedduellerDashboardLoggerMessages.cs @@ -0,0 +1,47 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerDashboardLoggerMessages +{ + private const int EventIdStart = 1300; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Debug, + "Dashboard job-event listener service started with {ListenerCount} listeners.")] + public static partial void DashboardJobEventListenerServiceStarted( + this ILogger logger, + int listenerCount); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Warning, + "Dashboard failed to publish live job event {EventSequence} for job {JobId}.")] + public static partial void DashboardJobEventPublishFailed( + this ILogger logger, + Exception exception, + Guid jobId, + long eventSequence); + + [LoggerMessage( + EventIdStart + 20, + LogLevel.Debug, + "Dashboard job-event retention cleanup skipped because no retention store is registered.")] + public static partial void DashboardEventRetentionStoreMissing( + this ILogger logger); + + [LoggerMessage( + EventIdStart + 21, + LogLevel.Information, + "Dashboard job-event retention cleanup deleted {DeletedCount} events.")] + public static partial void DashboardEventRetentionCleaned( + this ILogger logger, + int deletedCount); + + [LoggerMessage( + EventIdStart + 22, + LogLevel.Warning, + "Dashboard job-event retention cleanup failed.")] + public static partial void DashboardEventRetentionCleanupFailed( + this ILogger logger, + Exception exception); +} diff --git a/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs b/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs index ce33c64..7292336 100644 --- a/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs +++ b/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs @@ -1,21 +1,31 @@ namespace Sheddueller.Dashboard.Internal; using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; using Sheddueller.Storage; internal sealed class SignalRJobEventNotifier( IHubContext hubContext, - DashboardLiveUpdateStream stream) : IJobEventNotifier + DashboardLiveUpdateStream stream, + ILogger logger) : IJobEventNotifier { public async ValueTask NotifyAsync( JobEvent jobEvent, CancellationToken cancellationToken = default) { - await stream.NotifyAsync(jobEvent, cancellationToken).ConfigureAwait(false); - await hubContext.Clients.All.SendAsync("jobEvent", jobEvent, cancellationToken).ConfigureAwait(false); - await hubContext.Clients.Group(DashboardUpdatesHub.JobGroupName(jobEvent.JobId.ToString("N"))) - .SendAsync("jobEvent", jobEvent, cancellationToken) - .ConfigureAwait(false); + try + { + await stream.NotifyAsync(jobEvent, cancellationToken).ConfigureAwait(false); + await hubContext.Clients.All.SendAsync("jobEvent", jobEvent, cancellationToken).ConfigureAwait(false); + await hubContext.Clients.Group(DashboardUpdatesHub.JobGroupName(jobEvent.JobId.ToString("N"))) + .SendAsync("jobEvent", jobEvent, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception exception) + { + logger.DashboardJobEventPublishFailed(exception, jobEvent.JobId, jobEvent.EventSequence); + throw; + } } } diff --git a/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs b/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs index 199a614..7a3be33 100644 --- a/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs +++ b/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs @@ -2,6 +2,8 @@ namespace Sheddueller.Postgres.Internal; using System.Globalization; +using Microsoft.Extensions.Logging; + using Npgsql; using Sheddueller.Postgres.Internal.Operations; @@ -10,8 +12,11 @@ namespace Sheddueller.Postgres.Internal; internal sealed class PostgresJobEventListener( ShedduellerPostgresOptions options, - IJobEventNotifier publisher) : IShedduellerJobEventListener + IJobEventNotifier publisher, + ILogger logger) : IShedduellerJobEventListener { + private static readonly TimeSpan ListenerRetryDelay = TimeSpan.FromSeconds(1); + private readonly PostgresOperationContext _context = new(options); public async Task ListenAsync(CancellationToken cancellationToken) @@ -26,9 +31,10 @@ public async Task ListenAsync(CancellationToken cancellationToken) { return; } - catch (Exception) + catch (Exception exception) { - await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); + logger.PostgresJobEventListenerRetrying(exception, options.SchemaName, (long)ListenerRetryDelay.TotalMilliseconds); + await Task.Delay(ListenerRetryDelay, cancellationToken).ConfigureAwait(false); } } } @@ -43,6 +49,7 @@ private async Task ListenUntilDisconnectedAsync(CancellationToken cancellationTo await using var command = connection.CreateCommand(); command.CommandText = $"listen {PostgresNames.JobEventChannel};"; await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + logger.PostgresJobEventListenerStarted(options.SchemaName); while (!cancellationToken.IsCancellationRequested) { @@ -56,21 +63,37 @@ private async Task ListenUntilDisconnectedAsync(CancellationToken cancellationTo } private void OnNotification(object sender, NpgsqlNotificationEventArgs args) + => this.HandleNotificationPayload(args.Payload); + + internal void HandleNotificationPayload(string payload) { - if (!TryParsePayload(args.Payload, options.SchemaName, out var jobId, out var eventSequence)) + if (!TryParsePayload(payload, options.SchemaName, out var jobId, out var eventSequence)) { + logger.PostgresJobEventNotificationPayloadInvalid(options.SchemaName); return; } - _ = Task.Run(async () => + _ = Task.Run(() => this.PublishNotificationAsync(jobId, eventSequence)); + } + + private async Task PublishNotificationAsync(Guid jobId, long eventSequence) + { + try { var jobEvent = await PostgresJobEvents.ReadEventAsync(this._context, jobId, eventSequence, CancellationToken.None) .ConfigureAwait(false); - if (jobEvent is not null) + if (jobEvent is null) { - await publisher.NotifyAsync(jobEvent, CancellationToken.None).ConfigureAwait(false); + logger.PostgresJobEventNotificationMissing(jobId, eventSequence); + return; } - }); + + await publisher.NotifyAsync(jobEvent, CancellationToken.None).ConfigureAwait(false); + } + catch (Exception exception) + { + logger.PostgresJobEventNotificationFailed(exception, jobId, eventSequence); + } } private static bool TryParsePayload( diff --git a/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs b/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs index f98fa03..0442abb 100644 --- a/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs +++ b/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs @@ -2,12 +2,18 @@ namespace Sheddueller.Postgres.Internal; +using Microsoft.Extensions.Logging; + using Npgsql; using Sheddueller.Runtime; -internal sealed class PostgresWakeSignal(ShedduellerPostgresOptions options) : IShedduellerWakeSignal, IDisposable +internal sealed class PostgresWakeSignal( + ShedduellerPostgresOptions options, + ILogger logger) : IShedduellerWakeSignal, IDisposable { + private static readonly TimeSpan ListenerRetryDelay = TimeSpan.FromSeconds(1); + private readonly SemaphoreSlim _signal = new(0); private readonly CancellationTokenSource _disposeTokenSource = new(); private readonly Lock _listenerLock = new(); @@ -60,9 +66,10 @@ private async Task ListenAsync() { return; } - catch (Exception) + catch (Exception exception) { - await Task.Delay(TimeSpan.FromSeconds(1), this._disposeTokenSource.Token).ConfigureAwait(false); + logger.PostgresWakeListenerRetrying(exception, this._options.SchemaName, (long)ListenerRetryDelay.TotalMilliseconds); + await Task.Delay(ListenerRetryDelay, this._disposeTokenSource.Token).ConfigureAwait(false); } } } @@ -77,6 +84,7 @@ private async Task ListenUntilDisconnectedAsync(CancellationToken cancellationTo await using var command = connection.CreateCommand(); command.CommandText = $"listen {PostgresNames.WakeupChannel};"; await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + logger.PostgresWakeListenerStarted(this._options.SchemaName); while (!cancellationToken.IsCancellationRequested) { diff --git a/src/Sheddueller.Postgres/Internal/ShedduellerPostgresLoggerMessages.cs b/src/Sheddueller.Postgres/Internal/ShedduellerPostgresLoggerMessages.cs new file mode 100644 index 0000000..5b3fedb --- /dev/null +++ b/src/Sheddueller.Postgres/Internal/ShedduellerPostgresLoggerMessages.cs @@ -0,0 +1,69 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerPostgresLoggerMessages +{ + private const int EventIdStart = 1200; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Debug, + "PostgreSQL wake listener started for schema {SchemaName}.")] + public static partial void PostgresWakeListenerStarted( + this ILogger logger, + string schemaName); + + [LoggerMessage( + EventIdStart + 1, + LogLevel.Warning, + "PostgreSQL wake listener for schema {SchemaName} disconnected; retrying in {RetryDelayMs:D} ms.")] + public static partial void PostgresWakeListenerRetrying( + this ILogger logger, + Exception exception, + string schemaName, + long retryDelayMs); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Debug, + "PostgreSQL job-event listener started for schema {SchemaName}.")] + public static partial void PostgresJobEventListenerStarted( + this ILogger logger, + string schemaName); + + [LoggerMessage( + EventIdStart + 11, + LogLevel.Warning, + "PostgreSQL job-event listener for schema {SchemaName} disconnected; retrying in {RetryDelayMs:D} ms.")] + public static partial void PostgresJobEventListenerRetrying( + this ILogger logger, + Exception exception, + string schemaName, + long retryDelayMs); + + [LoggerMessage( + EventIdStart + 12, + LogLevel.Debug, + "Ignored PostgreSQL job-event notification with invalid payload for schema {SchemaName}.")] + public static partial void PostgresJobEventNotificationPayloadInvalid( + this ILogger logger, + string schemaName); + + [LoggerMessage( + EventIdStart + 13, + LogLevel.Debug, + "PostgreSQL job event {EventSequence} for job {JobId} was not found after notification.")] + public static partial void PostgresJobEventNotificationMissing( + this ILogger logger, + Guid jobId, + long eventSequence); + + [LoggerMessage( + EventIdStart + 14, + LogLevel.Warning, + "Failed to publish PostgreSQL job event {EventSequence} for job {JobId}.")] + public static partial void PostgresJobEventNotificationFailed( + this ILogger logger, + Exception exception, + Guid jobId, + long eventSequence); +} diff --git a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs index 4773669..754bad8 100644 --- a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs +++ b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs @@ -6,6 +6,7 @@ namespace Sheddueller.Worker.Internal; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller; @@ -21,7 +22,9 @@ internal sealed class ShedduellerWorker( TimeProvider timeProvider, IShedduellerWakeSignal wakeSignal, IShedduellerNodeIdProvider nodeIdProvider, - IJobEventSink jobEventSink) : BackgroundService + IJobEventSink jobEventSink, + ILogger logger, + ILogger jobContextLogger) : BackgroundService { private readonly IServiceProvider _serviceProvider = serviceProvider; private readonly IServiceScopeFactory _scopeFactory = scopeFactory; @@ -30,11 +33,14 @@ internal sealed class ShedduellerWorker( private readonly IShedduellerWakeSignal _wakeSignal = wakeSignal; private readonly IShedduellerNodeIdProvider _nodeIdProvider = nodeIdProvider; private readonly IJobEventSink _jobEventSink = jobEventSink; + private readonly ILogger _logger = logger; + private readonly ILogger _jobContextLogger = jobContextLogger; private readonly ConcurrentDictionary _runningJobs = new(); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var store = this._serviceProvider.GetRequiredService(); + this._logger.WorkerStarted(this._nodeIdProvider.NodeId); try { @@ -57,6 +63,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } claimedJob = true; + this._logger.JobClaimed(claimed.Job.JobId, claimed.Job.AttemptCount, this._nodeIdProvider.NodeId); this.TrackRunningJob(this.ExecuteClaimedJobAsync(store, claimed.Job, stoppingToken)); } @@ -72,8 +79,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // Shutdown stops claiming. Running jobs are awaited below so terminal state can be recorded. } + catch (Exception exception) + { + this._logger.WorkerFailed(exception, this._nodeIdProvider.NodeId); + throw; + } await this.WaitForRunningJobsAsync().ConfigureAwait(false); + this._logger.WorkerStopped(this._nodeIdProvider.NodeId); } private async ValueTask WaitForWorkOrCapacityAsync(CancellationToken stoppingToken) @@ -104,29 +117,42 @@ private async Task ExecuteClaimedJobAsync(IJobStore store, ClaimedJob job, Cance try { await this.InvokeClaimedJobAsync(job, executionTokenSource.Token).ConfigureAwait(false); - await store + var completed = await store .MarkCompletedAsync(new CompleteJobRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow()), CancellationToken.None) .ConfigureAwait(false); + if (completed) + { + this._logger.JobCompleted(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); + } } catch (OperationCanceledException) when (executionTokenSource.IsCancellationRequested) { if (cancellationState.CancellationRequestedAtUtc is not null) { - await store + var observed = await store .MarkCancellationObservedAsync( new ObserveJobCancellationRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow()), CancellationToken.None) .ConfigureAwait(false); + if (observed) + { + this._logger.JobCancellationObserved(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); + } } else { - await store + var released = await store .ReleaseJobAsync(new ReleaseJobRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow()), CancellationToken.None) .ConfigureAwait(false); + if (released) + { + this._logger.JobReleased(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); + } } } catch (Exception exception) { + this._logger.JobFailed(exception, job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); await store .MarkFailedAsync( new FailJobRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow(), CreateFailureInfo(exception)), @@ -136,7 +162,7 @@ await store finally { await executionTokenSource.CancelAsync().ConfigureAwait(false); - await WaitForHeartbeatTaskAsync(heartbeatTask).ConfigureAwait(false); + await this.WaitForHeartbeatTaskAsync(heartbeatTask, job).ConfigureAwait(false); executionTokenSource.Dispose(); } } @@ -149,7 +175,7 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken var serializableParameterTypes = methodParameterTypes .Where((_, index) => parameterBindings[index].Kind == JobMethodParameterBindingKind.Serialized) .ToArray(); - var jobContext = new JobContext(job.JobId, job.AttemptCount, this._jobEventSink, executionToken); + var jobContext = new JobContext(job.JobId, job.AttemptCount, this._jobEventSink, this._jobContextLogger, executionToken); var scope = this._scopeFactory.CreateAsyncScope(); await using (scope.ConfigureAwait(false)) @@ -335,6 +361,7 @@ await store if (recovered > 0 || materialized > 0) { this._wakeSignal.Notify(); + this._logger.WorkerPeriodicStoreWorkCompleted(this._nodeIdProvider.NodeId, recovered, materialized); } } @@ -359,6 +386,7 @@ private async Task RenewLeaseUntilStoppedAsync( if (!renewed) { + this._logger.JobLeaseRenewalLost(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); await executionTokenSource.CancelAsync().ConfigureAwait(false); return; } @@ -371,6 +399,7 @@ private async Task RenewLeaseUntilStoppedAsync( if (cancellationRequestedAtUtc is not null && cancellationState.CancellationRequestedAtUtc is null) { cancellationState.CancellationRequestedAtUtc = cancellationRequestedAtUtc; + this._logger.JobCancellationRequestedObserved(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); await executionTokenSource.CancelAsync().ConfigureAwait(false); return; } @@ -383,14 +412,15 @@ private async Task RenewLeaseUntilStoppedAsync( } [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Heartbeat failures should not fault the job execution cleanup path.")] - private static async ValueTask WaitForHeartbeatTaskAsync(Task heartbeatTask) + private async ValueTask WaitForHeartbeatTaskAsync(Task heartbeatTask, ClaimedJob job) { try { await heartbeatTask.ConfigureAwait(false); } - catch + catch (Exception exception) { + this._logger.JobHeartbeatFailed(exception, job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); // Failure to renew will be handled by lease expiry recovery. } } diff --git a/src/Sheddueller.Worker/Internal/ShedduellerWorkerLoggerMessages.cs b/src/Sheddueller.Worker/Internal/ShedduellerWorkerLoggerMessages.cs new file mode 100644 index 0000000..fdb43c1 --- /dev/null +++ b/src/Sheddueller.Worker/Internal/ShedduellerWorkerLoggerMessages.cs @@ -0,0 +1,123 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerWorkerLoggerMessages +{ + private const int EventIdStart = 1100; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Information, + "Sheddueller worker node {NodeId} started.")] + public static partial void WorkerStarted( + this ILogger logger, + string nodeId); + + [LoggerMessage( + EventIdStart + 1, + LogLevel.Information, + "Sheddueller worker node {NodeId} stopped.")] + public static partial void WorkerStopped( + this ILogger logger, + string nodeId); + + [LoggerMessage( + EventIdStart + 2, + LogLevel.Error, + "Sheddueller worker node {NodeId} stopped unexpectedly.")] + public static partial void WorkerFailed( + this ILogger logger, + Exception exception, + string nodeId); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Debug, + "Claimed job {JobId} for attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobClaimed( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 20, + LogLevel.Debug, + "Completed job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobCompleted( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 21, + LogLevel.Error, + "Job {JobId} attempt {AttemptNumber} failed on node {NodeId}.")] + public static partial void JobFailed( + this ILogger logger, + Exception exception, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 30, + LogLevel.Information, + "Cancellation was observed for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobCancellationObserved( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 31, + LogLevel.Warning, + "Released job {JobId} attempt {AttemptNumber} on node {NodeId} before completion.")] + public static partial void JobReleased( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 32, + LogLevel.Warning, + "Lost lease for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobLeaseRenewalLost( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 33, + LogLevel.Information, + "Cancellation was requested for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobCancellationRequestedObserved( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 34, + LogLevel.Warning, + "Heartbeat task failed for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobHeartbeatFailed( + this ILogger logger, + Exception exception, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 40, + LogLevel.Information, + "Worker node {NodeId} recovered {RecoveredCount} expired leases and materialized {MaterializedCount} recurring schedule occurrences.")] + public static partial void WorkerPeriodicStoreWorkCompleted( + this ILogger logger, + string nodeId, + int recoveredCount, + int materializedCount); +} diff --git a/src/Sheddueller/Enqueueing/JobEnqueuer.cs b/src/Sheddueller/Enqueueing/JobEnqueuer.cs index b1328e6..c081b07 100644 --- a/src/Sheddueller/Enqueueing/JobEnqueuer.cs +++ b/src/Sheddueller/Enqueueing/JobEnqueuer.cs @@ -2,6 +2,7 @@ namespace Sheddueller.Enqueueing; using System.Linq.Expressions; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Runtime; @@ -13,7 +14,8 @@ internal sealed class JobEnqueuer( IJobPayloadSerializer serializer, IOptions options, TimeProvider timeProvider, - IShedduellerWakeSignal wakeSignal) : IJobEnqueuer + IShedduellerWakeSignal wakeSignal, + ILogger logger) : IJobEnqueuer { public ValueTask EnqueueAsync( Expression> work, @@ -73,11 +75,14 @@ public async ValueTask> EnqueueManyAsync( throw new InvalidOperationException("The job store returned a result count that does not match the submitted batch size."); } - if (results.Any(result => result.WasEnqueued)) + var enqueuedCount = results.Count(result => result.WasEnqueued); + if (enqueuedCount > 0) { wakeSignal.Notify(); } + logger.JobsBatchEnqueued(requests.Length, enqueuedCount); + var jobIds = new Guid[results.Count]; for (var i = 0; i < results.Count; i++) { @@ -102,6 +107,11 @@ private async ValueTask EnqueueCoreAsync( if (result.WasEnqueued) { wakeSignal.Notify(); + logger.JobEnqueued(result.JobId, result.EnqueueSequence); + } + else + { + logger.JobEnqueueDeduplicated(result.JobId, result.EnqueueSequence); } return result.JobId; diff --git a/src/Sheddueller/Logging/ShedduellerLoggerMessages.cs b/src/Sheddueller/Logging/ShedduellerLoggerMessages.cs new file mode 100644 index 0000000..b38f86b --- /dev/null +++ b/src/Sheddueller/Logging/ShedduellerLoggerMessages.cs @@ -0,0 +1,105 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerLoggerMessages +{ + private const int EventIdStart = 1000; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Debug, + "Enqueued job {JobId} with sequence {EnqueueSequence}.")] + public static partial void JobEnqueued( + this ILogger logger, + Guid jobId, + long enqueueSequence); + + [LoggerMessage( + EventIdStart + 1, + LogLevel.Debug, + "Reused existing idempotent job {JobId} with sequence {EnqueueSequence}.")] + public static partial void JobEnqueueDeduplicated( + this ILogger logger, + Guid jobId, + long enqueueSequence); + + [LoggerMessage( + EventIdStart + 2, + LogLevel.Debug, + "Submitted {SubmittedCount} jobs and enqueued {EnqueuedCount} new jobs.")] + public static partial void JobsBatchEnqueued( + this ILogger logger, + int submittedCount, + int enqueuedCount); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Debug, + "Cancel request for job {JobId} returned {Result}.")] + public static partial void JobCancellationRequested( + this ILogger logger, + Guid jobId, + string result); + + [LoggerMessage( + EventIdStart + 20, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} upsert returned {Result}.")] + public static partial void RecurringScheduleUpserted( + this ILogger logger, + string scheduleKey, + string result); + + [LoggerMessage( + EventIdStart + 21, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} trigger returned {Status}.")] + public static partial void RecurringScheduleTriggered( + this ILogger logger, + string scheduleKey, + string status); + + [LoggerMessage( + EventIdStart + 22, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} delete returned {Deleted}.")] + public static partial void RecurringScheduleDeleted( + this ILogger logger, + string scheduleKey, + bool deleted); + + [LoggerMessage( + EventIdStart + 23, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} pause returned {Paused}.")] + public static partial void RecurringSchedulePaused( + this ILogger logger, + string scheduleKey, + bool paused); + + [LoggerMessage( + EventIdStart + 24, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} resume returned {Resumed}.")] + public static partial void RecurringScheduleResumed( + this ILogger logger, + string scheduleKey, + bool resumed); + + [LoggerMessage( + EventIdStart + 30, + LogLevel.Debug, + "Set concurrency group {GroupKey} limit to {Limit}.")] + public static partial void ConcurrencyGroupLimitSet( + this ILogger logger, + string groupKey, + int limit); + + [LoggerMessage( + EventIdStart + 40, + LogLevel.Warning, + "Failed to append durable job event for job {JobId}.")] + public static partial void JobEventAppendFailed( + this ILogger logger, + Exception exception, + Guid jobId); +} diff --git a/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs b/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs index 497df80..b8f46cd 100644 --- a/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs +++ b/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs @@ -1,9 +1,15 @@ namespace Sheddueller.Runtime; +using Microsoft.Extensions.Logging; + using Sheddueller.Enqueueing; using Sheddueller.Storage; -internal sealed class ConcurrencyGroupManager(IJobStore store, TimeProvider timeProvider, IShedduellerWakeSignal wakeSignal) : IConcurrencyGroupManager +internal sealed class ConcurrencyGroupManager( + IJobStore store, + TimeProvider timeProvider, + IShedduellerWakeSignal wakeSignal, + ILogger logger) : IConcurrencyGroupManager { public async ValueTask SetLimitAsync(string groupKey, int limit, CancellationToken cancellationToken = default) { @@ -18,6 +24,7 @@ await store .SetConcurrencyLimitAsync(new SetConcurrencyLimitRequest(groupKey, limit, timeProvider.GetUtcNow()), cancellationToken) .ConfigureAwait(false); wakeSignal.Notify(); + logger.ConcurrencyGroupLimitSet(groupKey, limit); } public ValueTask GetConfiguredLimitAsync(string groupKey, CancellationToken cancellationToken = default) diff --git a/src/Sheddueller/Runtime/JobContext.cs b/src/Sheddueller/Runtime/JobContext.cs index 9c9592c..4b58e32 100644 --- a/src/Sheddueller/Runtime/JobContext.cs +++ b/src/Sheddueller/Runtime/JobContext.cs @@ -2,12 +2,15 @@ namespace Sheddueller.Runtime; using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Logging; + using Sheddueller.Storage; internal sealed class JobContext( Guid jobId, int attemptNumber, IJobEventSink eventSink, + ILogger logger, CancellationToken cancellationToken) : IJobContext { public Guid JobId { get; } = jobId; @@ -65,8 +68,9 @@ private async ValueTask AppendBestEffortAsync( { throw; } - catch (Exception) + catch (Exception exception) { + logger.JobEventAppendFailed(exception, request.JobId); // Best-effort telemetry must not fail the owning job. } } diff --git a/src/Sheddueller/Runtime/JobManager.cs b/src/Sheddueller/Runtime/JobManager.cs index b102c02..f318000 100644 --- a/src/Sheddueller/Runtime/JobManager.cs +++ b/src/Sheddueller/Runtime/JobManager.cs @@ -1,11 +1,20 @@ namespace Sheddueller.Runtime; +using Microsoft.Extensions.Logging; + using Sheddueller.Storage; internal sealed class JobManager( IJobStore store, - TimeProvider timeProvider) : IJobManager + TimeProvider timeProvider, + ILogger logger) : IJobManager { - public ValueTask CancelAsync(Guid jobId, CancellationToken cancellationToken = default) - => store.CancelAsync(new CancelJobRequest(jobId, timeProvider.GetUtcNow()), cancellationToken); + public async ValueTask CancelAsync(Guid jobId, CancellationToken cancellationToken = default) + { + var result = await store.CancelAsync(new CancelJobRequest(jobId, timeProvider.GetUtcNow()), cancellationToken) + .ConfigureAwait(false); + logger.JobCancellationRequested(jobId, result.ToString()); + + return result; + } } diff --git a/src/Sheddueller/Runtime/RecurringScheduleManager.cs b/src/Sheddueller/Runtime/RecurringScheduleManager.cs index 3b87387..8704988 100644 --- a/src/Sheddueller/Runtime/RecurringScheduleManager.cs +++ b/src/Sheddueller/Runtime/RecurringScheduleManager.cs @@ -3,6 +3,7 @@ namespace Sheddueller.Runtime; using System.Linq.Expressions; using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Enqueueing; @@ -15,7 +16,8 @@ internal sealed class RecurringScheduleManager( IJobPayloadSerializer serializer, IOptions options, TimeProvider timeProvider, - IShedduellerWakeSignal wakeSignal) : IRecurringScheduleManager + IShedduellerWakeSignal wakeSignal, + ILogger logger) : IRecurringScheduleManager { public ValueTask CreateOrUpdateAsync( string scheduleKey, @@ -65,25 +67,36 @@ public async ValueTask TriggerAsync( wakeSignal.Notify(); } + logger.RecurringScheduleTriggered(scheduleKey, result.Status.ToString()); + return result; } - public ValueTask DeleteAsync(string scheduleKey, CancellationToken cancellationToken = default) + public async ValueTask DeleteAsync(string scheduleKey, CancellationToken cancellationToken = default) { SubmissionValidator.ValidateScheduleKey(scheduleKey); - return store.DeleteRecurringScheduleAsync(scheduleKey, cancellationToken); + var deleted = await store.DeleteRecurringScheduleAsync(scheduleKey, cancellationToken).ConfigureAwait(false); + logger.RecurringScheduleDeleted(scheduleKey, deleted); + + return deleted; } - public ValueTask PauseAsync(string scheduleKey, CancellationToken cancellationToken = default) + public async ValueTask PauseAsync(string scheduleKey, CancellationToken cancellationToken = default) { SubmissionValidator.ValidateScheduleKey(scheduleKey); - return store.PauseRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken); + var paused = await store.PauseRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false); + logger.RecurringSchedulePaused(scheduleKey, paused); + + return paused; } - public ValueTask ResumeAsync(string scheduleKey, CancellationToken cancellationToken = default) + public async ValueTask ResumeAsync(string scheduleKey, CancellationToken cancellationToken = default) { SubmissionValidator.ValidateScheduleKey(scheduleKey); - return store.ResumeRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken); + var resumed = await store.ResumeRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false); + logger.RecurringScheduleResumed(scheduleKey, resumed); + + return resumed; } public ValueTask GetAsync(string scheduleKey, CancellationToken cancellationToken = default) @@ -175,6 +188,7 @@ private async ValueTask CreateOrUpdateCoreAsync( var result = await store.CreateOrUpdateRecurringScheduleAsync(request, cancellationToken).ConfigureAwait(false); wakeSignal.Notify(); + logger.RecurringScheduleUpserted(scheduleKey, result.ToString()); return result; } diff --git a/src/Sheddueller/Sheddueller.csproj b/src/Sheddueller/Sheddueller.csproj index 80045b6..114be10 100644 --- a/src/Sheddueller/Sheddueller.csproj +++ b/src/Sheddueller/Sheddueller.csproj @@ -2,6 +2,7 @@ + all diff --git a/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs b/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs index e3c623d..d0c1ef9 100644 --- a/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs +++ b/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs @@ -27,6 +27,7 @@ public static IServiceCollection AddSheddueller( { ArgumentNullException.ThrowIfNull(services); + services.AddLogging(); services.AddOptions(); services.TryAddSingleton(TimeProvider.System); services.TryAddSingleton(); diff --git a/test/Sheddueller.Dashboard.Tests/JobEventRetentionServiceLoggingTests.cs b/test/Sheddueller.Dashboard.Tests/JobEventRetentionServiceLoggingTests.cs new file mode 100644 index 0000000..22198b3 --- /dev/null +++ b/test/Sheddueller.Dashboard.Tests/JobEventRetentionServiceLoggingTests.cs @@ -0,0 +1,55 @@ +namespace Sheddueller.Dashboard.Tests; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +using Sheddueller.Dashboard; +using Sheddueller.Dashboard.Internal; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; + +using Shouldly; + +public sealed class JobEventRetentionServiceLoggingTests +{ + [Fact] + public async Task Cleanup_NonZeroDeletedCount_LogsCleanupCount() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var store = new RecordingRetentionStore(3); + var services = new ServiceCollection(); + services.AddSingleton(store); + using var serviceProvider = services.BuildServiceProvider(); + using var logs = new TestLoggerProvider(); + using var loggerFactory = LoggerFactory.Create(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + using var service = new JobEventRetentionService( + serviceProvider, + Options.Create(new ShedduellerDashboardOptions { EventRetention = TimeSpan.FromDays(1) }), + loggerFactory.CreateLogger()); + + await service.StartAsync(cancellationTokenSource.Token); + await store.CleanupCalled.Task.WaitAsync(cancellationTokenSource.Token); + await service.StopAsync(cancellationTokenSource.Token); + + var entry = logs.SingleByEventId(1321); + entry.Level.ShouldBe(LogLevel.Information); + entry.Properties["DeletedCount"].ShouldBe(3); + entry.MessageTemplate.ShouldBe("Dashboard job-event retention cleanup deleted {DeletedCount} events."); + } + + private sealed class RecordingRetentionStore(int deletedCount) : IJobEventRetentionStore + { + public TaskCompletionSource CleanupCalled { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public ValueTask CleanupAsync( + TimeSpan retention, + CancellationToken cancellationToken = default) + { + this.CleanupCalled.TrySetResult(); + return ValueTask.FromResult(deletedCount); + } + } +} diff --git a/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj b/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj index 83ec564..bc871f6 100644 --- a/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj +++ b/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj @@ -7,4 +7,8 @@ + + + + diff --git a/test/Sheddueller.Postgres.Tests/PostgresJobEventListenerLoggingTests.cs b/test/Sheddueller.Postgres.Tests/PostgresJobEventListenerLoggingTests.cs new file mode 100644 index 0000000..86f01a7 --- /dev/null +++ b/test/Sheddueller.Postgres.Tests/PostgresJobEventListenerLoggingTests.cs @@ -0,0 +1,44 @@ +namespace Sheddueller.Postgres.Tests; + +using Microsoft.Extensions.Logging; + +using Sheddueller.Postgres.Internal; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; + +using Shouldly; + +public sealed class PostgresJobEventListenerLoggingTests +{ + [Fact] + public void Notification_InvalidPayload_LogsIgnoredPayload() + { + using var logs = new TestLoggerProvider(); + using var loggerFactory = LoggerFactory.Create(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + var listener = new PostgresJobEventListener( + new ShedduellerPostgresOptions + { + DataSource = null!, + SchemaName = "sheddueller", + }, + new NoOpJobEventNotifier(), + loggerFactory.CreateLogger()); + + listener.HandleNotificationPayload("not-a-valid-payload"); + + var entry = logs.SingleByEventId(1212); + entry.Level.ShouldBe(LogLevel.Debug); + entry.Properties["SchemaName"].ShouldBe("sheddueller"); + entry.MessageTemplate.ShouldBe("Ignored PostgreSQL job-event notification with invalid payload for schema {SchemaName}."); + } + + private sealed class NoOpJobEventNotifier : IJobEventNotifier + { + public ValueTask NotifyAsync( + JobEvent jobEvent, + CancellationToken cancellationToken = default) + => ValueTask.CompletedTask; + } +} diff --git a/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj b/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj index 9286d4f..3f5dcb3 100644 --- a/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj +++ b/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/test/Sheddueller.Tests/JobContextLoggingTests.cs b/test/Sheddueller.Tests/JobContextLoggingTests.cs new file mode 100644 index 0000000..918df4f --- /dev/null +++ b/test/Sheddueller.Tests/JobContextLoggingTests.cs @@ -0,0 +1,44 @@ +namespace Sheddueller.Tests; + +using Microsoft.Extensions.Logging; + +using Sheddueller.Runtime; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; + +using Shouldly; + +public sealed class JobContextLoggingTests +{ + [Fact] + public async Task Log_EventSinkFails_LogsDiagnosticAndDoesNotFailJob() + { + var jobId = Guid.NewGuid(); + using var logs = new TestLoggerProvider(); + using var loggerFactory = LoggerFactory.Create(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + var context = new JobContext( + jobId, + 1, + new FailingJobEventSink(), + loggerFactory.CreateLogger(), + CancellationToken.None); + + await context.LogAsync(JobLogLevel.Information, "hello"); + + var entry = logs.SingleByEventId(1040); + entry.Level.ShouldBe(LogLevel.Warning); + entry.Exception.ShouldBeOfType(); + entry.Properties["JobId"].ShouldBe(jobId); + entry.MessageTemplate.ShouldBe("Failed to append durable job event for job {JobId}."); + } + + private sealed class FailingJobEventSink : IJobEventSink + { + public ValueTask AppendAsync( + AppendJobEventRequest request, + CancellationToken cancellationToken = default) + => throw new InvalidOperationException("append failed"); + } +} diff --git a/test/Sheddueller.Tests/Logging/TestLoggerProvider.cs b/test/Sheddueller.Tests/Logging/TestLoggerProvider.cs new file mode 100644 index 0000000..2990d30 --- /dev/null +++ b/test/Sheddueller.Tests/Logging/TestLoggerProvider.cs @@ -0,0 +1,94 @@ +namespace Sheddueller.Tests.Logging; + +using System.Collections.Concurrent; + +using Microsoft.Extensions.Logging; + +internal sealed class TestLoggerProvider : ILoggerProvider +{ + private readonly ConcurrentQueue _entries = new(); + + public IReadOnlyList Entries + => [.. this._entries]; + + public ILogger CreateLogger(string categoryName) + => new TestLogger(categoryName, this._entries); + + public void Dispose() + { + } + + public TestLogEntry SingleByEventId(int eventId) + => this.Entries.Single(entry => entry.EventId.Id == eventId); + + public bool HasEventId(int eventId) + => this.Entries.Any(entry => entry.EventId.Id == eventId); + + private sealed class TestLogger( + string categoryName, + ConcurrentQueue entries) : ILogger + { + public IDisposable BeginScope(TState state) + where TState : notnull + => NullScope.Instance; + + public bool IsEnabled(LogLevel logLevel) + => true; + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + ArgumentNullException.ThrowIfNull(formatter); + + var properties = ReadProperties(state); + properties.TryGetValue("{OriginalFormat}", out var messageTemplate); + + entries.Enqueue(new TestLogEntry( + categoryName, + logLevel, + eventId, + messageTemplate as string, + formatter(state, exception), + exception, + properties)); + } + + private static Dictionary ReadProperties(TState state) + { + if (state is not IEnumerable> pairs) + { + return new Dictionary(StringComparer.Ordinal); + } + + var properties = new Dictionary(StringComparer.Ordinal); + foreach (var pair in pairs) + { + properties[pair.Key] = pair.Value; + } + + return properties; + } + } + + private sealed class NullScope : IDisposable + { + public static readonly NullScope Instance = new(); + + public void Dispose() + { + } + } +} + +internal sealed record TestLogEntry( + string CategoryName, + LogLevel Level, + EventId EventId, + string? MessageTemplate, + string RenderedMessage, + Exception? Exception, + IReadOnlyDictionary Properties); diff --git a/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs b/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs index 7b95ccb..4e46a2a 100644 --- a/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs +++ b/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs @@ -1,5 +1,6 @@ namespace Sheddueller.Tests; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Sheddueller.Runtime; @@ -75,7 +76,8 @@ private static RecurringScheduleManager CreateManager( new SystemTextJsonJobPayloadSerializer(), Options.Create(options ?? new ShedduellerOptions()), TimeProvider.System, - wakeSignal); + wakeSignal, + NullLogger.Instance); private sealed class RecordingWakeSignal : IShedduellerWakeSignal { diff --git a/test/Sheddueller.Tests/RegistrationTests.cs b/test/Sheddueller.Tests/RegistrationTests.cs index a09ef9f..0f89f0a 100644 --- a/test/Sheddueller.Tests/RegistrationTests.cs +++ b/test/Sheddueller.Tests/RegistrationTests.cs @@ -2,6 +2,7 @@ namespace Sheddueller.Tests; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Runtime; @@ -30,6 +31,7 @@ public void AddSheddueller_CustomProvider_RegistersCoreServices() provider.GetRequiredService().ShouldNotBeNull(); provider.GetRequiredService().ShouldBeSameAs(provider.GetRequiredService()); provider.GetRequiredService().ShouldBeSameAs(serializer); + provider.GetRequiredService().ShouldNotBeNull(); provider.GetServices().ShouldBeEmpty(); provider.GetServices().Count().ShouldBe(1); } diff --git a/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj b/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj index 8287aad..858ad30 100644 --- a/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj +++ b/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj @@ -3,4 +3,8 @@ + + + + diff --git a/test/Sheddueller.Worker.Tests/WorkerLoggingTests.cs b/test/Sheddueller.Worker.Tests/WorkerLoggingTests.cs new file mode 100644 index 0000000..464bceb --- /dev/null +++ b/test/Sheddueller.Worker.Tests/WorkerLoggingTests.cs @@ -0,0 +1,206 @@ +namespace Sheddueller.Worker.Tests; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +using Sheddueller.Serialization; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; +using Sheddueller.Worker.Internal; + +using Shouldly; + +public sealed class WorkerLoggingTests +{ + [Fact] + public async Task JobExecution_Exception_LogsFailureAndMarksJobFailed() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(); + var store = new SingleClaimJobStore(job); + using var logs = new TestLoggerProvider(); + var services = new ServiceCollection(); + services.AddLogging(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + services.AddSingleton(store); + services.AddSingleton(serviceProvider => serviceProvider.GetRequiredService()); + services.AddTransient(); + services.AddShedduellerWorker(builder => builder.ConfigureOptions(options => + { + options.NodeId = "worker-a"; + options.IdlePollingInterval = TimeSpan.FromMilliseconds(10); + options.HeartbeatInterval = TimeSpan.FromSeconds(5); + options.LeaseDuration = TimeSpan.FromSeconds(30); + })); + await using var provider = services.BuildServiceProvider(); + var worker = provider.GetServices().OfType().Single(); + + await worker.StartAsync(cancellationTokenSource.Token); + var failed = await store.Failed.Task.WaitAsync(cancellationTokenSource.Token); + await worker.StopAsync(cancellationTokenSource.Token); + + failed.JobId.ShouldBe(job.JobId); + failed.Failure.ExceptionType.ShouldBe(typeof(InvalidOperationException).FullName); + + var entry = logs.SingleByEventId(1121); + entry.Level.ShouldBe(LogLevel.Error); + entry.Exception.ShouldBeOfType(); + entry.Properties["JobId"].ShouldBe(job.JobId); + entry.Properties["AttemptNumber"].ShouldBe(job.AttemptCount); + entry.Properties["NodeId"].ShouldBe("worker-a"); + entry.MessageTemplate.ShouldBe("Job {JobId} attempt {AttemptNumber} failed on node {NodeId}."); + } + + private static ClaimedJob CreateClaimedJob() + => new( + Guid.NewGuid(), + EnqueueSequence: 1, + Priority: 0, + ServiceType: typeof(FailingJob).AssemblyQualifiedName!, + MethodName: nameof(FailingJob.FailAsync), + MethodParameterTypes: [typeof(CancellationToken).AssemblyQualifiedName!], + SerializedArguments: new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), + ConcurrencyGroupKeys: [], + AttemptCount: 1, + MaxAttempts: 1, + LeaseToken: Guid.NewGuid(), + LeaseExpiresAtUtc: DateTimeOffset.UtcNow.AddSeconds(30), + RetryBackoffKind: null, + RetryBaseDelay: null, + RetryMaxDelay: null, + SourceScheduleKey: null, + ScheduledFireAtUtc: null, + MethodParameterBindings: [new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken)]); + + private sealed class FailingJob + { + public Task FailAsync(CancellationToken cancellationToken) + => throw new InvalidOperationException("job failed"); + } + + private sealed class SingleClaimJobStore(ClaimedJob job) : IJobStore + { + private int _claimed; + + public TaskCompletionSource Failed { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public ValueTask EnqueueAsync( + EnqueueJobRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> EnqueueManyAsync( + IReadOnlyList requests, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TryClaimNextAsync( + ClaimJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult( + Interlocked.Exchange(ref this._claimed, 1) == 0 + ? new ClaimJobResult.Claimed(job) + : new ClaimJobResult.NoJobAvailable()); + + public ValueTask MarkCompletedAsync( + CompleteJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask MarkFailedAsync( + FailJobRequest request, + CancellationToken cancellationToken = default) + { + this.Failed.TrySetResult(request); + return ValueTask.FromResult(true); + } + + public ValueTask RenewLeaseAsync( + RenewLeaseRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask ReleaseJobAsync( + ReleaseJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecoverExpiredLeasesAsync( + RecoverExpiredLeasesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + + public ValueTask CancelAsync( + CancelJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(JobCancellationResult.NotFound); + + public ValueTask GetCancellationRequestedAtAsync( + JobCancellationStatusRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(null); + + public ValueTask MarkCancellationObservedAsync( + ObserveJobCancellationRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecordWorkerNodeHeartbeatAsync( + WorkerNodeHeartbeatRequest request, + CancellationToken cancellationToken = default) + => ValueTask.CompletedTask; + + public ValueTask SetConcurrencyLimitAsync( + SetConcurrencyLimitRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetConfiguredConcurrencyLimitAsync( + string groupKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask CreateOrUpdateRecurringScheduleAsync( + UpsertRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TriggerRecurringScheduleAsync( + TriggerRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask DeleteRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask PauseRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset pausedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask ResumeRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset resumedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> ListRecurringSchedulesAsync( + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask MaterializeDueRecurringSchedulesAsync( + MaterializeDueRecurringSchedulesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + } +} From 5cf25dbdac3bc3dbfdcd3e717cc641b22a783523 Mon Sep 17 00:00:00 2001 From: Brian Tyler Date: Sun, 26 Apr 2026 14:37:00 +0100 Subject: [PATCH 2/6] fix: standardize package version ranges in Directory.Packages.props --- Directory.Packages.props | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 4526bbb..22485a2 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,21 +1,23 @@ true + [10.0.0, 11.0.0) + [10.0.0, 11.0.0) - - - - - - + + + + + + - + - + From bfcb0f8da5ca54d7711e185c7c498fae07485a82 Mon Sep 17 00:00:00 2001 From: Brian Tyler Date: Sun, 26 Apr 2026 15:01:23 +0100 Subject: [PATCH 3/6] feat(invocation): add job invocation metadata and enhance inspection capabilities --- .../Components/Pages/JobDetail.razor | 121 +++++++++++++++ .../Internal/DashboardFormat.cs | 22 ++- .../PostgresJobInspectionOperation.cs | 140 ++++++++++++++++++ .../Internal/ShedduellerWorker.cs | 20 +-- .../Inspection/Jobs/JobInspectionDetail.cs | 5 + .../Jobs/JobInvocationInspection.cs | 16 ++ .../Jobs/JobInvocationParameterInspection.cs | 12 ++ .../JobSerializedArgumentsInspectionStatus.cs | 32 ++++ .../JobMethodParameterBindingResolver.cs | 59 ++++++++ .../DashboardEndpointTests.cs | 47 +++++- .../InspectionContractTests.cs | 103 ++++++++++++- 11 files changed, 551 insertions(+), 26 deletions(-) create mode 100644 src/Sheddueller/Inspection/Jobs/JobInvocationInspection.cs create mode 100644 src/Sheddueller/Inspection/Jobs/JobInvocationParameterInspection.cs create mode 100644 src/Sheddueller/Inspection/Jobs/JobSerializedArgumentsInspectionStatus.cs create mode 100644 src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs diff --git a/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor b/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor index 6bf4599..f00a804 100644 --- a/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor +++ b/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor @@ -208,6 +208,51 @@
+ @if (_detail.Invocation is { } invocation) + { +
+

Invocation

+ +
+ + + +
+ + @if (invocation.SerializedArgumentsStatus != JobSerializedArgumentsInspectionStatus.Displayable) + { +

@DashboardFormat.InvocationStatus(invocation)

+ } + +
+ @foreach (var parameter in invocation.Parameters) + { +
+
+ @string.Create(CultureInfo.InvariantCulture, $"#{parameter.ParameterIndex + 1}") + @DashboardFormat.ShortTypeName(parameter.ParameterType) + @DashboardFormat.InvocationBinding(parameter) +
+ + @if (parameter.SerializedValueJson is not null) + { +
@parameter.SerializedValueJson
+ } +
+ } +
+
+ } +

Lifecycle Timeline

@@ -571,6 +616,7 @@ .job-detail-mono, .job-detail-metadata-item strong, .job-detail-chip, + .job-detail-parameter pre, .job-detail-progress-percent, .job-detail-log-table, .job-detail-timeline__time span { @@ -944,6 +990,77 @@ line-height: 16px; } + .job-detail-invocation-panel { + display: flex; + flex-direction: column; + gap: 14px; + } + + .job-detail-invocation-summary { + display: grid; + grid-template-columns: minmax(0, 2fr) minmax(120px, 1fr) minmax(160px, 1fr); + gap: 12px; + margin-top: 2px; + } + + .job-detail-parameter-list { + display: flex; + min-width: 0; + flex-direction: column; + border-top: 1px solid var(--sd-outline-variant); + } + + .job-detail-parameter { + min-width: 0; + border-bottom: 1px solid var(--sd-outline-variant); + padding: 10px 0; + } + + .job-detail-parameter:last-child { + border-bottom: 0; + padding-bottom: 0; + } + + .job-detail-parameter__header { + display: flex; + min-width: 0; + align-items: center; + gap: 8px; + } + + .job-detail-parameter__header span, + .job-detail-parameter__header em { + flex: 0 0 auto; + color: var(--sd-on-surface-variant); + font-size: 12px; + font-style: normal; + line-height: 16px; + } + + .job-detail-parameter__header strong { + overflow: hidden; + color: var(--sd-on-surface); + font-size: 13px; + line-height: 18px; + text-overflow: ellipsis; + white-space: nowrap; + } + + .job-detail-parameter pre { + overflow: auto; + max-height: 240px; + margin-top: 8px; + border: 1px solid var(--sd-outline-variant); + border-radius: 2px; + background: var(--sd-surface-container); + color: var(--sd-on-surface); + font-size: 12px; + line-height: 18px; + padding: 10px; + white-space: pre-wrap; + overflow-wrap: anywhere; + } + .job-detail-operations { gap: var(--sd-page-margin); } @@ -1335,6 +1452,10 @@ .job-detail-timeline__time { text-align: left; } + + .job-detail-invocation-summary { + grid-template-columns: 1fr; + } } diff --git a/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs b/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs index 603f096..dd96a39 100644 --- a/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs +++ b/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs @@ -28,6 +28,26 @@ public static string FullHandler(JobInspectionSummary job) public static string ShortHandler(JobInspectionSummary job) => string.Concat(ShortTypeName(job.ServiceType), ".", job.MethodName); + public static string InvocationHandler(JobInvocationInspection invocation) + => string.Concat(ShortTypeName(invocation.ServiceType), ".", invocation.MethodName); + + public static string InvocationBinding(JobInvocationParameterInspection parameter) + => parameter.Binding.Kind switch + { + JobMethodParameterBindingKind.Serialized => "serialized", + JobMethodParameterBindingKind.CancellationToken => "scheduler-owned", + JobMethodParameterBindingKind.JobContext => "Job.Context", + JobMethodParameterBindingKind.Service => string.Create( + CultureInfo.InvariantCulture, + $"Job.Resolve<{ShortTypeName(parameter.Binding.ServiceType ?? parameter.ParameterType)}>()"), + _ => parameter.Binding.Kind.ToString(), + }; + + public static string InvocationStatus(JobInvocationInspection invocation) + => invocation.SerializedArgumentsStatus == JobSerializedArgumentsInspectionStatus.Displayable + ? "Serialized arguments are displayable." + : FirstNonEmpty(invocation.SerializedArgumentsStatusMessage, invocation.SerializedArgumentsStatus.ToString()); + public static string Attempts(JobInspectionSummary job, bool compact = false) => compact ? string.Create(CultureInfo.InvariantCulture, $"{job.AttemptCount}/{job.MaxAttempts}") @@ -292,7 +312,7 @@ public static string LogLevelClass(JobEvent jobEvent) public static string FirstNonEmpty(params string?[] values) => values.FirstOrDefault(static value => !string.IsNullOrWhiteSpace(value)) ?? string.Empty; - private static string ShortTypeName(string typeName) + public static string ShortTypeName(string typeName) { var typeDelimiterIndex = typeName.IndexOf(',', StringComparison.Ordinal); if (typeDelimiterIndex >= 0) diff --git a/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs b/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs index c058a6a..f5501cd 100644 --- a/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs +++ b/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs @@ -2,14 +2,23 @@ namespace Sheddueller.Postgres.Internal.Operations; using System.Globalization; using System.Runtime.CompilerServices; +using System.Text.Json; using Npgsql; using Sheddueller.Inspection.Jobs; +using Sheddueller.Serialization; using Sheddueller.Storage; internal static class PostgresJobInspectionOperation { + private const int SerializedArgumentDisplayByteLimit = 64 * 1024; + + private static readonly JsonSerializerOptions SerializedArgumentDisplayJsonOptions = new() + { + WriteIndented = true, + }; + public static async ValueTask GetOverviewAsync( PostgresOperationContext context, CancellationToken cancellationToken) @@ -134,6 +143,7 @@ await CreateSummaryAsync(context, connection, row, cancellationToken).ConfigureA row.LeaseExpiresAtUtc, row.ScheduledFireAtUtc) { + Invocation = await ReadInvocationAsync(context, connection, row, cancellationToken).ConfigureAwait(false), RetryCloneJobIds = await ReadRetryCloneJobIdsAsync(context, connection, jobId, cancellationToken).ConfigureAwait(false), }; } @@ -389,6 +399,136 @@ from claimable return rows.Count == 0 ? null : rows[0]; } + private static async ValueTask ReadInvocationAsync( + PostgresOperationContext context, + NpgsqlConnection connection, + PostgresJobInspectionRow row, + CancellationToken cancellationToken) + { + await using var command = connection.CreateCommand(); + command.CommandText = + $""" + select + method_parameter_types, + invocation_target_kind, + method_parameter_bindings, + serialized_arguments_content_type, + serialized_arguments + from {context.Names.Jobs} + where job_id = @job_id; + """; + command.Parameters.AddWithValue("job_id", row.JobId); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + return null; + } + + var parameterTypes = reader.GetFieldValue(0); + var targetKind = PostgresConversion.ToInvocationTargetKind(reader.GetValue(1)); + var parameterBindings = JobMethodParameterBindingResolver.Normalize( + parameterTypes, + PostgresConversion.ToParameterBindings(reader.GetValue(2))); + var payload = PostgresConversion.ToPayload(reader.GetValue(3), reader.GetValue(4)); + + return CreateInvocation(row, targetKind, parameterTypes, parameterBindings, payload); + } + + private static JobInvocationInspection CreateInvocation( + PostgresJobInspectionRow row, + JobInvocationTargetKind targetKind, + string[] parameterTypes, + IReadOnlyList parameterBindings, + SerializedJobPayload payload) + { + var serializedValueJson = new string?[parameterTypes.Length]; + var status = PopulateSerializedValueJson(payload, parameterBindings, serializedValueJson, out var statusMessage); + var parameters = new JobInvocationParameterInspection[parameterTypes.Length]; + + for (var i = 0; i < parameterTypes.Length; i++) + { + parameters[i] = new JobInvocationParameterInspection( + i, + parameterTypes[i], + parameterBindings[i], + serializedValueJson[i]); + } + + return new JobInvocationInspection( + targetKind, + row.ServiceType, + row.MethodName, + parameters, + payload.ContentType, + payload.Data.LongLength, + status, + statusMessage); + } + + private static JobSerializedArgumentsInspectionStatus PopulateSerializedValueJson( + SerializedJobPayload payload, + IReadOnlyList parameterBindings, + string?[] serializedValueJson, + out string? statusMessage) + { + if (!string.Equals(payload.ContentType, SystemTextJsonJobPayloadSerializer.JsonContentType, StringComparison.Ordinal)) + { + statusMessage = string.Create( + CultureInfo.InvariantCulture, + $"Serialized arguments use unsupported content type '{payload.ContentType}'."); + return JobSerializedArgumentsInspectionStatus.UnsupportedContentType; + } + + if (payload.Data.LongLength > SerializedArgumentDisplayByteLimit) + { + statusMessage = string.Create( + CultureInfo.InvariantCulture, + $"Serialized arguments are {payload.Data.LongLength:N0} bytes, exceeding the {SerializedArgumentDisplayByteLimit:N0} byte display limit."); + return JobSerializedArgumentsInspectionStatus.TooLarge; + } + + var serializedParameterIndexes = parameterBindings + .Select((binding, index) => (binding, index)) + .Where(parameter => parameter.binding.Kind == JobMethodParameterBindingKind.Serialized) + .Select(parameter => parameter.index) + .ToArray(); + + try + { + using var document = JsonDocument.Parse(payload.Data); + var root = document.RootElement; + if (root.ValueKind != JsonValueKind.Array) + { + statusMessage = "Serialized arguments payload is not a JSON array."; + return JobSerializedArgumentsInspectionStatus.InvalidPayload; + } + + if (root.GetArrayLength() != serializedParameterIndexes.Length) + { + statusMessage = string.Create( + CultureInfo.InvariantCulture, + $"Serialized arguments contain {root.GetArrayLength()} values for {serializedParameterIndexes.Length} serialized parameters."); + return JobSerializedArgumentsInspectionStatus.ArgumentCountMismatch; + } + + var argumentIndex = 0; + foreach (var element in root.EnumerateArray()) + { + serializedValueJson[serializedParameterIndexes[argumentIndex]] = JsonSerializer.Serialize(element, SerializedArgumentDisplayJsonOptions); + argumentIndex++; + } + + statusMessage = null; + return JobSerializedArgumentsInspectionStatus.Displayable; + } + catch (JsonException) + { + statusMessage = "Serialized arguments payload is not valid JSON."; + return JobSerializedArgumentsInspectionStatus.InvalidPayload; + } + } + private static async ValueTask> ReadRowsAsync( NpgsqlCommand command, CancellationToken cancellationToken) diff --git a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs index 754bad8..3eaf1f6 100644 --- a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs +++ b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs @@ -308,25 +308,7 @@ private static object ResolveBoundService( private static IReadOnlyList NormalizeParameterBindings( Type[] methodParameterTypes, IReadOnlyList? parameterBindings) - { - if (parameterBindings is { Count: > 0 }) - { - return parameterBindings; - } - - var inferred = new JobMethodParameterBinding[methodParameterTypes.Length]; - for (var i = 0; i < methodParameterTypes.Length; i++) - { - inferred[i] = methodParameterTypes[i] switch - { - Type type when type == typeof(CancellationToken) => new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), - Type type when type == typeof(IJobContext) => new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext), - _ => new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), - }; - } - - return inferred; - } + => JobMethodParameterBindingResolver.Normalize(methodParameterTypes, parameterBindings); private void TrackRunningJob(Task executionTask) { diff --git a/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs b/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs index c87c678..9d9e9e9 100644 --- a/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs +++ b/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs @@ -10,6 +10,11 @@ public sealed record JobInspectionDetail( DateTimeOffset? LeaseExpiresAtUtc, DateTimeOffset? ScheduledFireAtUtc) { + /// + /// Reconstructed persisted invocation metadata for this job. + /// + public JobInvocationInspection? Invocation { get; init; } + /// /// Jobs cloned from this failed job. /// diff --git a/src/Sheddueller/Inspection/Jobs/JobInvocationInspection.cs b/src/Sheddueller/Inspection/Jobs/JobInvocationInspection.cs new file mode 100644 index 0000000..bef66af --- /dev/null +++ b/src/Sheddueller/Inspection/Jobs/JobInvocationInspection.cs @@ -0,0 +1,16 @@ +namespace Sheddueller.Inspection.Jobs; + +using Sheddueller.Storage; + +/// +/// Reconstructed persisted invocation metadata for a job. +/// +public sealed record JobInvocationInspection( + JobInvocationTargetKind TargetKind, + string ServiceType, + string MethodName, + IReadOnlyList Parameters, + string SerializedArgumentsContentType, + long SerializedArgumentsByteCount, + JobSerializedArgumentsInspectionStatus SerializedArgumentsStatus, + string? SerializedArgumentsStatusMessage = null); diff --git a/src/Sheddueller/Inspection/Jobs/JobInvocationParameterInspection.cs b/src/Sheddueller/Inspection/Jobs/JobInvocationParameterInspection.cs new file mode 100644 index 0000000..c1f1bc3 --- /dev/null +++ b/src/Sheddueller/Inspection/Jobs/JobInvocationParameterInspection.cs @@ -0,0 +1,12 @@ +namespace Sheddueller.Inspection.Jobs; + +using Sheddueller.Storage; + +/// +/// Reconstructed persisted invocation metadata for one job method parameter. +/// +public sealed record JobInvocationParameterInspection( + int ParameterIndex, + string ParameterType, + JobMethodParameterBinding Binding, + string? SerializedValueJson = null); diff --git a/src/Sheddueller/Inspection/Jobs/JobSerializedArgumentsInspectionStatus.cs b/src/Sheddueller/Inspection/Jobs/JobSerializedArgumentsInspectionStatus.cs new file mode 100644 index 0000000..a0bd23d --- /dev/null +++ b/src/Sheddueller/Inspection/Jobs/JobSerializedArgumentsInspectionStatus.cs @@ -0,0 +1,32 @@ +namespace Sheddueller.Inspection.Jobs; + +/// +/// Describes whether persisted serialized job arguments can be displayed for inspection. +/// +public enum JobSerializedArgumentsInspectionStatus +{ + /// + /// The serialized arguments were parsed and mapped to invocation parameters. + /// + Displayable = 0, + + /// + /// The payload content type is not understood by the built-in inspection renderer. + /// + UnsupportedContentType = 1, + + /// + /// The payload exceeds the inspection display limit. + /// + TooLarge = 2, + + /// + /// The payload was expected to be displayable but could not be parsed. + /// + InvalidPayload = 3, + + /// + /// The payload argument count does not match the reconstructed invocation. + /// + ArgumentCountMismatch = 4, +} diff --git a/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs b/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs new file mode 100644 index 0000000..b33d533 --- /dev/null +++ b/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs @@ -0,0 +1,59 @@ +namespace Sheddueller.Storage; + +internal static class JobMethodParameterBindingResolver +{ + public static IReadOnlyList Normalize( + IReadOnlyList methodParameterTypes, + IReadOnlyList? parameterBindings) + { + ArgumentNullException.ThrowIfNull(methodParameterTypes); + + if (parameterBindings is { Count: > 0 }) + { + return parameterBindings; + } + + var inferred = new JobMethodParameterBinding[methodParameterTypes.Count]; + for (var i = 0; i < methodParameterTypes.Count; i++) + { + inferred[i] = CreateInferredBinding(methodParameterTypes[i]); + } + + return inferred; + } + + public static IReadOnlyList Normalize( + IReadOnlyList methodParameterTypes, + IReadOnlyList? parameterBindings) + { + ArgumentNullException.ThrowIfNull(methodParameterTypes); + + if (parameterBindings is { Count: > 0 }) + { + return parameterBindings; + } + + var inferred = new JobMethodParameterBinding[methodParameterTypes.Count]; + for (var i = 0; i < methodParameterTypes.Count; i++) + { + inferred[i] = CreateInferredBinding(methodParameterTypes[i]); + } + + return inferred; + } + + private static JobMethodParameterBinding CreateInferredBinding(Type parameterType) + => parameterType switch + { + Type type when type == typeof(CancellationToken) => new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + Type type when type == typeof(IJobContext) => new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext), + _ => new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + }; + + private static JobMethodParameterBinding CreateInferredBinding(string parameterType) + => string.Equals(parameterType, typeof(CancellationToken).AssemblyQualifiedName, StringComparison.Ordinal) + ? new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken) + : string.Equals(parameterType, typeof(IJobContext).AssemblyQualifiedName, StringComparison.Ordinal) + ? new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext) + : new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized); +} diff --git a/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs b/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs index 42f5c75..f944ae6 100644 --- a/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs +++ b/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs @@ -16,6 +16,7 @@ namespace Sheddueller.Dashboard.Tests; using Sheddueller.Inspection.Metrics; using Sheddueller.Inspection.Nodes; using Sheddueller.Inspection.Schedules; +using Sheddueller.Serialization; using Sheddueller.Storage; using Shouldly; @@ -307,6 +308,14 @@ public async Task JobDetail_KnownJob_RendersDetailAndDefaultLogFilter() html.ShouldContain("href=\"jobs?handler=StubService.Run\""); html.ShouldContain("href=\"jobs?tag=tenant%3Aacme\""); html.ShouldContain("href=\"jobs?group=tenant-acme\""); + html.ShouldContain("Invocation"); + html.ShouldContain("StubPayload"); + html.ShouldContain("tenantId"); + html.ShouldContain("acme"); + html.ShouldContain("Job.Resolve"); + html.ShouldContain("StubDependency"); + html.ShouldContain("Job.Context"); + html.ShouldContain("scheduler-owned"); html.ShouldContain("Lifecycle Timeline"); html.ShouldContain("Attempt 1 Failed"); html.ShouldContain("ConnectionTimeoutError: db-primary cluster unreachable"); @@ -525,6 +534,37 @@ private sealed class StubJobInspectionReader : IJobInspectionReader private static readonly DateTimeOffset CompletedAtUtc = DateTimeOffset.Parse("2026-04-20T12:09:00Z", CultureInfo.InvariantCulture); private static readonly DateTimeOffset CanceledAtUtc = DateTimeOffset.Parse("2026-04-20T12:02:00Z", CultureInfo.InvariantCulture); private static readonly DateTimeOffset LeaseExpiresAtUtc = DateTimeOffset.Parse("2026-04-20T12:10:00Z", CultureInfo.InvariantCulture); + private static readonly JobInvocationInspection Invocation = new( + JobInvocationTargetKind.Instance, + "Sheddueller.Dashboard.Tests.DashboardEndpointTests.StubService", + "Run", + [ + new JobInvocationParameterInspection( + 0, + "Sheddueller.Dashboard.Tests.DashboardEndpointTests.StubPayload", + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + string.Join( + Environment.NewLine, + "{", + " \"tenantId\": \"acme\",", + " \"chunk\": 4", + "}")), + new JobInvocationParameterInspection( + 1, + "Sheddueller.Dashboard.Tests.DashboardEndpointTests.StubDependency", + new JobMethodParameterBinding(JobMethodParameterBindingKind.Service, "Sheddueller.Dashboard.Tests.DashboardEndpointTests.StubDependency")), + new JobInvocationParameterInspection( + 2, + typeof(IJobContext).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext)), + new JobInvocationParameterInspection( + 3, + typeof(CancellationToken).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken)), + ], + SystemTextJsonJobPayloadSerializer.JsonContentType, + SerializedArgumentsByteCount: 31, + JobSerializedArgumentsInspectionStatus.Displayable); private static readonly JobInspectionSummary Job = new( JobId, @@ -711,7 +751,7 @@ private static async IAsyncEnumerable ReadEventsCoreAsync( } private static JobInspectionDetail CreateDetail(JobInspectionSummary job) - => job.State == JobState.Queued + => (job.State == JobState.Queued ? new JobInspectionDetail( job, ClaimedAtUtc: null, @@ -723,7 +763,10 @@ private static JobInspectionDetail CreateDetail(JobInspectionSummary job) ClaimedAtUtc, ClaimedByNodeId: "worker-us-east-1a-04", job.State == JobState.Claimed ? LeaseExpiresAtUtc : null, - ScheduledFireAtUtc: null); + ScheduledFireAtUtc: null)) with + { + Invocation = Invocation, + }; } private sealed class StubJobManager : IJobManager diff --git a/test/Sheddueller.ProviderContracts/InspectionContractTests.cs b/test/Sheddueller.ProviderContracts/InspectionContractTests.cs index 60ece44..8a8a73f 100644 --- a/test/Sheddueller.ProviderContracts/InspectionContractTests.cs +++ b/test/Sheddueller.ProviderContracts/InspectionContractTests.cs @@ -1,5 +1,7 @@ namespace Sheddueller.ProviderContracts; +using System.Text.Json; + using Sheddueller.Inspection.ConcurrencyGroups; using Sheddueller.Inspection.Jobs; using Sheddueller.Inspection.Metrics; @@ -68,6 +70,77 @@ public async Task JobSummary_ClaimedAtUtc_IsVisibleForClaimedJobs() detail.ShouldNotBeNull().Summary.ClaimedAtUtc.ShouldBe(detail.ClaimedAtUtc); } + [Fact] + public async Task GetJob_InvocationMetadata_ReconstructsRuntimeBindingsAndJsonArguments() + { + await using var context = await this.CreateContextAsync(); + var jobId = Guid.NewGuid(); + + await context.Store.EnqueueAsync(CreateRequest( + jobId, + serviceType: typeof(InspectionInvocationService).AssemblyQualifiedName, + methodName: nameof(InspectionInvocationService.RunAsync), + methodParameterTypes: + [ + typeof(InspectionPayload).AssemblyQualifiedName!, + typeof(InspectionDependency).AssemblyQualifiedName!, + typeof(IJobContext).AssemblyQualifiedName!, + typeof(CancellationToken).AssemblyQualifiedName!, + ], + serializedArguments: new SerializedJobPayload( + SystemTextJsonJobPayloadSerializer.JsonContentType, + JsonSerializer.SerializeToUtf8Bytes(new[] { new { name = "alpha", count = 42 } })), + methodParameterBindings: + [ + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + new JobMethodParameterBinding(JobMethodParameterBindingKind.Service, typeof(InspectionDependency).AssemblyQualifiedName), + new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ])); + + var detail = await context.Reader.GetJobAsync(jobId); + + var invocation = detail.ShouldNotBeNull().Invocation.ShouldNotBeNull(); + invocation.TargetKind.ShouldBe(JobInvocationTargetKind.Instance); + invocation.ServiceType.ShouldBe(typeof(InspectionInvocationService).AssemblyQualifiedName); + invocation.MethodName.ShouldBe(nameof(InspectionInvocationService.RunAsync)); + invocation.SerializedArgumentsContentType.ShouldBe(SystemTextJsonJobPayloadSerializer.JsonContentType); + invocation.SerializedArgumentsStatus.ShouldBe(JobSerializedArgumentsInspectionStatus.Displayable); + invocation.Parameters.Select(parameter => parameter.Binding.Kind).ShouldBe([ + JobMethodParameterBindingKind.Serialized, + JobMethodParameterBindingKind.Service, + JobMethodParameterBindingKind.JobContext, + JobMethodParameterBindingKind.CancellationToken, + ]); + var valueJson = invocation.Parameters[0].SerializedValueJson.ShouldNotBeNull(); + valueJson.ShouldContain("\"name\": \"alpha\""); + valueJson.ShouldContain("\"count\": 42"); + invocation.Parameters[1].Binding.ServiceType.ShouldBe(typeof(InspectionDependency).AssemblyQualifiedName); + invocation.Parameters.Skip(1).All(parameter => parameter.SerializedValueJson is null).ShouldBeTrue(); + } + + [Fact] + public async Task GetJob_InvocationMetadata_CustomPayloadReportsUnsupportedContentType() + { + await using var context = await this.CreateContextAsync(); + var jobId = Guid.NewGuid(); + + await context.Store.EnqueueAsync(CreateRequest( + jobId, + methodParameterTypes: [typeof(string).AssemblyQualifiedName!], + serializedArguments: new SerializedJobPayload("application/x-test", [1, 2, 3]), + methodParameterBindings: [new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized)])); + + var detail = await context.Reader.GetJobAsync(jobId); + + var invocation = detail.ShouldNotBeNull().Invocation.ShouldNotBeNull(); + invocation.SerializedArgumentsContentType.ShouldBe("application/x-test"); + invocation.SerializedArgumentsByteCount.ShouldBe(3); + invocation.SerializedArgumentsStatus.ShouldBe(JobSerializedArgumentsInspectionStatus.UnsupportedContentType); + invocation.SerializedArgumentsStatusMessage.ShouldNotBeNull().ShouldContain("unsupported content type"); + invocation.Parameters.ShouldHaveSingleItem().SerializedValueJson.ShouldBeNull(); + } + [Fact] public async Task SearchJobs_HandlerSubstringSearch_MatchesAssemblyUnqualifiedHandler() { @@ -425,21 +498,27 @@ protected static EnqueueJobRequest CreateRequest( RetryBackoffKind? retryBackoffKind = null, TimeSpan? retryBaseDelay = null, string? serviceType = null, - string? methodName = null) + string? methodName = null, + IReadOnlyList? methodParameterTypes = null, + SerializedJobPayload? serializedArguments = null, + JobInvocationTargetKind invocationTargetKind = JobInvocationTargetKind.Instance, + IReadOnlyList? methodParameterBindings = null) => new( jobId, priority, serviceType ?? typeof(InspectionContractService).AssemblyQualifiedName!, methodName ?? nameof(InspectionContractService.RunAsync), - [typeof(CancellationToken).AssemblyQualifiedName!], - new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), + methodParameterTypes ?? [typeof(CancellationToken).AssemblyQualifiedName!], + serializedArguments ?? new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), groupKeys ?? [], DateTimeOffset.UtcNow, notBeforeUtc, maxAttempts, retryBackoffKind, retryBaseDelay, - Tags: tags); + Tags: tags, + InvocationTargetKind: invocationTargetKind, + MethodParameterBindings: methodParameterBindings); protected static UpsertRecurringScheduleRequest CreateSchedule( string scheduleKey, @@ -474,4 +553,20 @@ private sealed class InspectionContractService public Task RunAsync(CancellationToken cancellationToken) => Task.CompletedTask; } + + private sealed record InspectionPayload(string Name, int Count); + + private sealed class InspectionDependency + { + } + + private sealed class InspectionInvocationService + { + public Task RunAsync( + InspectionPayload payload, + InspectionDependency dependency, + IJobContext context, + CancellationToken cancellationToken) + => Task.CompletedTask; + } } From cd68b5586dfc030831b7b6204b8ad87f6b3465e6 Mon Sep 17 00:00:00 2001 From: Brian Tyler Date: Sun, 26 Apr 2026 15:14:31 +0100 Subject: [PATCH 4/6] feat(invocation): add reconstructed call display and formatter for job invocations --- .../Components/Pages/JobDetail.razor | 17 +++ .../PostgresJobInspectionOperation.cs | 3 + .../Jobs/JobInvocationDisplayFormatter.cs | 132 ++++++++++++++++++ .../Jobs/JobInvocationInspection.cs | 1 + .../DashboardEndpointTests.cs | 21 +-- .../InspectionContractTests.cs | 8 ++ .../JobInvocationDisplayFormatterTests.cs | 64 +++++++++ 7 files changed, 236 insertions(+), 10 deletions(-) create mode 100644 src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs create mode 100644 test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs diff --git a/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor b/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor index f00a804..5991e5f 100644 --- a/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor +++ b/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor @@ -213,6 +213,8 @@

Invocation

+
@invocation.ReconstructedCall
+