diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4546a0d..6e49788 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -5,6 +5,10 @@ on: branches: - "**" +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: test: name: Build and test (${{ matrix.postgres-image }}) diff --git a/Directory.Packages.props b/Directory.Packages.props index 2a77d19..22485a2 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,20 +1,23 @@ true + [10.0.0, 11.0.0) + [10.0.0, 11.0.0) - - - - - + + + + + + - + - + diff --git a/src/Sheddueller.Dashboard/Components/DashboardApp.razor b/src/Sheddueller.Dashboard/Components/DashboardApp.razor index 1175756..dd05bef 100644 --- a/src/Sheddueller.Dashboard/Components/DashboardApp.razor +++ b/src/Sheddueller.Dashboard/Components/DashboardApp.razor @@ -12,9 +12,12 @@ + + + diff --git a/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor b/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor index 6bf4599..620d0fd 100644 --- a/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor +++ b/src/Sheddueller.Dashboard/Components/Pages/JobDetail.razor @@ -1,8 +1,10 @@ @page "/jobs/{JobId:guid}" @inherits DashboardPageComponent +@using Microsoft.JSInterop @inject IJobInspectionReader Reader @inject IJobManager JobManager @inject DashboardLiveUpdateStream LiveUpdates +@inject IJSRuntime JsRuntime
@@ -208,6 +210,54 @@
+ @if (_detail.Invocation is { } invocation) + { +
+

Invocation

+ +
@invocation.ReconstructedCall
+ +
+ + + +
+ + @if (invocation.SerializedArgumentsStatus != JobSerializedArgumentsInspectionStatus.Displayable) + { +

@DashboardFormat.InvocationStatus(invocation)

+ } + +
+ @foreach (var parameter in invocation.Parameters) + { +
+
+ @string.Create(CultureInfo.InvariantCulture, $"#{parameter.ParameterIndex + 1}") + @DashboardFormat.ShortTypeName(parameter.ParameterType) + @DashboardFormat.InvocationBinding(parameter) +
+ + @if (parameter.SerializedValueJson is not null) + { +
@parameter.SerializedValueJson
+ } +
+ } +
+
+ } +

Lifecycle Timeline

@@ -571,6 +621,8 @@ .job-detail-mono, .job-detail-metadata-item strong, .job-detail-chip, + .job-detail-invocation-call, + .job-detail-parameter pre, .job-detail-progress-percent, .job-detail-log-table, .job-detail-timeline__time span { @@ -944,6 +996,112 @@ line-height: 16px; } + .job-detail-invocation-panel { + display: flex; + flex-direction: column; + gap: 14px; + } + + .job-detail-invocation-summary { + display: grid; + grid-template-columns: minmax(0, 2fr) minmax(120px, 1fr) minmax(160px, 1fr); + gap: 12px; + margin-top: 2px; + } + + .job-detail-invocation-call { + overflow: auto; + margin: 0; + border: 1px solid var(--sd-outline-variant); + border-radius: 2px; + background: var(--sd-surface-container); + color: var(--sd-on-surface); + font-size: 13px; + line-height: 20px; + padding: 10px; + white-space: pre-wrap; + overflow-wrap: anywhere; + } + + pre.job-detail-invocation-call[class*="language-"] { + overflow: auto; + margin: 0; + border: 1px solid var(--sd-outline-variant); + border-radius: 2px; + font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; + font-size: 13px; + line-height: 20px; + padding: 10px; + white-space: pre-wrap; + overflow-wrap: anywhere; + } + + pre.job-detail-invocation-call[class*="language-"] > code[class*="language-"] { + display: block; + background: transparent; + font: inherit; + white-space: inherit; + overflow-wrap: inherit; + } + + .job-detail-parameter-list { + display: flex; + min-width: 0; + flex-direction: column; + border-top: 1px solid var(--sd-outline-variant); + } + + .job-detail-parameter { + min-width: 0; + border-bottom: 1px solid var(--sd-outline-variant); + padding: 10px 0; + } + + .job-detail-parameter:last-child { + border-bottom: 0; + padding-bottom: 0; + } + + .job-detail-parameter__header { + display: flex; + min-width: 0; + align-items: center; + gap: 8px; + } + + .job-detail-parameter__header span, + .job-detail-parameter__header em { + flex: 0 0 auto; + color: var(--sd-on-surface-variant); + font-size: 12px; + font-style: normal; + line-height: 16px; + } + + .job-detail-parameter__header strong { + overflow: hidden; + color: var(--sd-on-surface); + font-size: 13px; + line-height: 18px; + text-overflow: ellipsis; + white-space: nowrap; + } + + .job-detail-parameter pre { + overflow: auto; + max-height: 240px; + margin-top: 8px; + border: 1px solid var(--sd-outline-variant); + border-radius: 2px; + background: var(--sd-surface-container); + color: var(--sd-on-surface); + font-size: 12px; + line-height: 18px; + padding: 10px; + white-space: pre-wrap; + overflow-wrap: anywhere; + } + .job-detail-operations { gap: var(--sd-page-margin); } @@ -1335,6 +1493,10 @@ .job-detail-timeline__time { text-align: left; } + + .job-detail-invocation-summary { + grid-template-columns: 1fr; + } } @@ -1342,10 +1504,12 @@ private const int EventPageSize = 500; private readonly List _events = []; + private ElementReference _invocationCodeElement; private JobInspectionDetail? _detail; private Guid? _loadedJobId; private string? _error; private string? _actionMessage; + private string? _highlightedInvocationCall; private JobDetailActionAlertKind _actionAlertKind; private bool _isLoading; private bool _isCancelRunning; @@ -1418,6 +1582,7 @@ this._loadedJobId = this.JobId; this._detail = null; this._events.Clear(); + this._highlightedInvocationCall = null; this._error = null; this.ClearActionAlert(); this._isCancelRunning = false; @@ -1428,6 +1593,31 @@ await this.LoadAsync(); } + protected override async Task OnAfterRenderAsync(bool firstRender) + { + _ = firstRender; + + if (this._detail?.Invocation is not { } invocation || + string.Equals(this._highlightedInvocationCall, invocation.ReconstructedCall, StringComparison.Ordinal)) + { + return; + } + + try + { + await JsRuntime.InvokeVoidAsync( + "ShedduellerDashboard.highlightCode", + this._invocationCodeElement); + this._highlightedInvocationCall = invocation.ReconstructedCall; + } + catch (JSDisconnectedException) + { + } + catch (JSException) + { + } + } + private protected override ValueTask DisposePageAsync() { LiveUpdates.JobEventPublished -= this.OnJobEventPublishedAsync; diff --git a/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs b/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs index 603f096..dd96a39 100644 --- a/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs +++ b/src/Sheddueller.Dashboard/Internal/DashboardFormat.cs @@ -28,6 +28,26 @@ public static string FullHandler(JobInspectionSummary job) public static string ShortHandler(JobInspectionSummary job) => string.Concat(ShortTypeName(job.ServiceType), ".", job.MethodName); + public static string InvocationHandler(JobInvocationInspection invocation) + => string.Concat(ShortTypeName(invocation.ServiceType), ".", invocation.MethodName); + + public static string InvocationBinding(JobInvocationParameterInspection parameter) + => parameter.Binding.Kind switch + { + JobMethodParameterBindingKind.Serialized => "serialized", + JobMethodParameterBindingKind.CancellationToken => "scheduler-owned", + JobMethodParameterBindingKind.JobContext => "Job.Context", + JobMethodParameterBindingKind.Service => string.Create( + CultureInfo.InvariantCulture, + $"Job.Resolve<{ShortTypeName(parameter.Binding.ServiceType ?? parameter.ParameterType)}>()"), + _ => parameter.Binding.Kind.ToString(), + }; + + public static string InvocationStatus(JobInvocationInspection invocation) + => invocation.SerializedArgumentsStatus == JobSerializedArgumentsInspectionStatus.Displayable + ? "Serialized arguments are displayable." + : FirstNonEmpty(invocation.SerializedArgumentsStatusMessage, invocation.SerializedArgumentsStatus.ToString()); + public static string Attempts(JobInspectionSummary job, bool compact = false) => compact ? string.Create(CultureInfo.InvariantCulture, $"{job.AttemptCount}/{job.MaxAttempts}") @@ -292,7 +312,7 @@ public static string LogLevelClass(JobEvent jobEvent) public static string FirstNonEmpty(params string?[] values) => values.FirstOrDefault(static value => !string.IsNullOrWhiteSpace(value)) ?? string.Empty; - private static string ShortTypeName(string typeName) + public static string ShortTypeName(string typeName) { var typeDelimiterIndex = typeName.IndexOf(',', StringComparison.Ordinal); if (typeDelimiterIndex >= 0) diff --git a/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs b/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs index dd71a12..5c55157 100644 --- a/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs +++ b/src/Sheddueller.Dashboard/Internal/DashboardJobEventListenerService.cs @@ -1,15 +1,18 @@ namespace Sheddueller.Dashboard.Internal; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Sheddueller.Runtime; internal sealed class DashboardJobEventListenerService( - IEnumerable listeners) : BackgroundService + IEnumerable listeners, + ILogger logger) : BackgroundService { protected override Task ExecuteAsync(CancellationToken stoppingToken) { var snapshot = listeners.ToArray(); + logger.DashboardJobEventListenerServiceStarted(snapshot.Length); return snapshot.Length == 0 ? Task.CompletedTask : Task.WhenAll(snapshot.Select(listener => listener.ListenAsync(stoppingToken))); diff --git a/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs b/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs index 9077bac..811528a 100644 --- a/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs +++ b/src/Sheddueller.Dashboard/Internal/JobEventRetentionService.cs @@ -1,7 +1,10 @@ namespace Sheddueller.Dashboard.Internal; +using System.Diagnostics.CodeAnalysis; + using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Dashboard; @@ -9,15 +12,29 @@ namespace Sheddueller.Dashboard.Internal; internal sealed class JobEventRetentionService( IServiceProvider serviceProvider, - IOptions options) : BackgroundService + IOptions options, + ILogger logger) : BackgroundService { + [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Retention cleanup failures are diagnostic and should not stop the dashboard host.")] protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { while (!stoppingToken.IsCancellationRequested) { - await this.CleanupOnceAsync(stoppingToken).ConfigureAwait(false); + try + { + await this.CleanupOnceAsync(stoppingToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + throw; + } + catch (Exception exception) + { + logger.DashboardEventRetentionCleanupFailed(exception); + } + await Task.Delay(TimeSpan.FromHours(1), stoppingToken).ConfigureAwait(false); } } @@ -31,9 +48,14 @@ private async ValueTask CleanupOnceAsync(CancellationToken cancellationToken) var store = serviceProvider.GetService(); if (store is null) { + logger.DashboardEventRetentionStoreMissing(); return; } - await store.CleanupAsync(options.Value.EventRetention, cancellationToken).ConfigureAwait(false); + var deleted = await store.CleanupAsync(options.Value.EventRetention, cancellationToken).ConfigureAwait(false); + if (deleted > 0) + { + logger.DashboardEventRetentionCleaned(deleted); + } } } diff --git a/src/Sheddueller.Dashboard/Internal/ShedduellerDashboardLoggerMessages.cs b/src/Sheddueller.Dashboard/Internal/ShedduellerDashboardLoggerMessages.cs new file mode 100644 index 0000000..4b2718b --- /dev/null +++ b/src/Sheddueller.Dashboard/Internal/ShedduellerDashboardLoggerMessages.cs @@ -0,0 +1,47 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerDashboardLoggerMessages +{ + private const int EventIdStart = 1300; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Debug, + "Dashboard job-event listener service started with {ListenerCount} listeners.")] + public static partial void DashboardJobEventListenerServiceStarted( + this ILogger logger, + int listenerCount); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Warning, + "Dashboard failed to publish live job event {EventSequence} for job {JobId}.")] + public static partial void DashboardJobEventPublishFailed( + this ILogger logger, + Exception exception, + Guid jobId, + long eventSequence); + + [LoggerMessage( + EventIdStart + 20, + LogLevel.Debug, + "Dashboard job-event retention cleanup skipped because no retention store is registered.")] + public static partial void DashboardEventRetentionStoreMissing( + this ILogger logger); + + [LoggerMessage( + EventIdStart + 21, + LogLevel.Information, + "Dashboard job-event retention cleanup deleted {DeletedCount} events.")] + public static partial void DashboardEventRetentionCleaned( + this ILogger logger, + int deletedCount); + + [LoggerMessage( + EventIdStart + 22, + LogLevel.Warning, + "Dashboard job-event retention cleanup failed.")] + public static partial void DashboardEventRetentionCleanupFailed( + this ILogger logger, + Exception exception); +} diff --git a/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs b/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs index ce33c64..7292336 100644 --- a/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs +++ b/src/Sheddueller.Dashboard/Internal/SignalRJobEventNotifier.cs @@ -1,21 +1,31 @@ namespace Sheddueller.Dashboard.Internal; using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; using Sheddueller.Storage; internal sealed class SignalRJobEventNotifier( IHubContext hubContext, - DashboardLiveUpdateStream stream) : IJobEventNotifier + DashboardLiveUpdateStream stream, + ILogger logger) : IJobEventNotifier { public async ValueTask NotifyAsync( JobEvent jobEvent, CancellationToken cancellationToken = default) { - await stream.NotifyAsync(jobEvent, cancellationToken).ConfigureAwait(false); - await hubContext.Clients.All.SendAsync("jobEvent", jobEvent, cancellationToken).ConfigureAwait(false); - await hubContext.Clients.Group(DashboardUpdatesHub.JobGroupName(jobEvent.JobId.ToString("N"))) - .SendAsync("jobEvent", jobEvent, cancellationToken) - .ConfigureAwait(false); + try + { + await stream.NotifyAsync(jobEvent, cancellationToken).ConfigureAwait(false); + await hubContext.Clients.All.SendAsync("jobEvent", jobEvent, cancellationToken).ConfigureAwait(false); + await hubContext.Clients.Group(DashboardUpdatesHub.JobGroupName(jobEvent.JobId.ToString("N"))) + .SendAsync("jobEvent", jobEvent, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception exception) + { + logger.DashboardJobEventPublishFailed(exception, jobEvent.JobId, jobEvent.EventSequence); + throw; + } } } diff --git a/src/Sheddueller.Dashboard/wwwroot/vendor/prism/LICENSE.txt b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/LICENSE.txt new file mode 100644 index 0000000..528949f --- /dev/null +++ b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/LICENSE.txt @@ -0,0 +1,21 @@ +MIT LICENSE + +Copyright (c) 2012 Lea Verou + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/src/Sheddueller.Dashboard/wwwroot/vendor/prism/prism.css b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/prism.css new file mode 100644 index 0000000..8c4cc05 --- /dev/null +++ b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/prism.css @@ -0,0 +1 @@ +code[class*=language-],pre[class*=language-]{color:#000;background:0 0;text-shadow:0 1px #fff;font-family:Consolas,Monaco,'Andale Mono','Ubuntu Mono',monospace;font-size:1em;text-align:left;white-space:pre;word-spacing:normal;word-break:normal;word-wrap:normal;line-height:1.5;-moz-tab-size:4;-o-tab-size:4;tab-size:4;-webkit-hyphens:none;-moz-hyphens:none;-ms-hyphens:none;hyphens:none}code[class*=language-] ::-moz-selection,code[class*=language-]::-moz-selection,pre[class*=language-] ::-moz-selection,pre[class*=language-]::-moz-selection{text-shadow:none;background:#b3d4fc}code[class*=language-] ::selection,code[class*=language-]::selection,pre[class*=language-] ::selection,pre[class*=language-]::selection{text-shadow:none;background:#b3d4fc}@media print{code[class*=language-],pre[class*=language-]{text-shadow:none}}pre[class*=language-]{padding:1em;margin:.5em 0;overflow:auto}:not(pre)>code[class*=language-],pre[class*=language-]{background:#f5f2f0}:not(pre)>code[class*=language-]{padding:.1em;border-radius:.3em;white-space:normal}.token.cdata,.token.comment,.token.doctype,.token.prolog{color:#708090}.token.punctuation{color:#999}.token.namespace{opacity:.7}.token.boolean,.token.constant,.token.deleted,.token.number,.token.property,.token.symbol,.token.tag{color:#905}.token.attr-name,.token.builtin,.token.char,.token.inserted,.token.selector,.token.string{color:#690}.language-css .token.string,.style .token.string,.token.entity,.token.operator,.token.url{color:#9a6e3a;background:hsla(0,0%,100%,.5)}.token.atrule,.token.attr-value,.token.keyword{color:#07a}.token.class-name,.token.function{color:#dd4a68}.token.important,.token.regex,.token.variable{color:#e90}.token.bold,.token.important{font-weight:700}.token.italic{font-style:italic}.token.entity{cursor:help} \ No newline at end of file diff --git a/src/Sheddueller.Dashboard/wwwroot/vendor/prism/prism.js b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/prism.js new file mode 100644 index 0000000..b270e91 --- /dev/null +++ b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/prism.js @@ -0,0 +1,4 @@ +/*! PrismJS 1.30.0 | MIT License | https://prismjs.com */ +var _self="undefined"!=typeof window?window:"undefined"!=typeof WorkerGlobalScope&&self instanceof WorkerGlobalScope?self:{},Prism=function(e){var n=/(?:^|\s)lang(?:uage)?-([\w-]+)(?=\s|$)/i,t=0,r={},a={manual:e.Prism&&e.Prism.manual,disableWorkerMessageHandler:e.Prism&&e.Prism.disableWorkerMessageHandler,util:{encode:function e(n){return n instanceof i?new i(n.type,e(n.content),n.alias):Array.isArray(n)?n.map(e):n.replace(/&/g,"&").replace(/=g.reach);A+=w.value.length,w=w.next){var P=w.value;if(n.length>e.length)return;if(!(P instanceof i)){var E,S=1;if(y){if(!(E=l(b,A,e,m))||E.index>=e.length)break;var L=E.index,O=E.index+E[0].length,C=A;for(C+=w.value.length;L>=C;)C+=(w=w.next).value.length;if(A=C-=w.value.length,w.value instanceof i)continue;for(var j=w;j!==n.tail&&(Cg.reach&&(g.reach=W);var I=w.prev;if(_&&(I=u(n,I,_),A+=_.length),c(n,I,S),w=u(n,I,new i(f,p?a.tokenize(N,p):N,k,N)),M&&u(n,w,M),S>1){var T={cause:f+","+d,reach:W};o(e,n,t,w.prev,A,T),g&&T.reach>g.reach&&(g.reach=T.reach)}}}}}}function s(){var e={value:null,prev:null,next:null},n={value:null,prev:e,next:null};e.next=n,this.head=e,this.tail=n,this.length=0}function u(e,n,t){var r=n.next,a={value:t,prev:n,next:r};return n.next=a,r.prev=a,e.length++,a}function c(e,n,t){for(var r=n.next,a=0;a"+i.content+""},!e.document)return e.addEventListener?(a.disableWorkerMessageHandler||e.addEventListener("message",(function(n){var t=JSON.parse(n.data),r=t.language,i=t.code,l=t.immediateClose;e.postMessage(a.highlight(i,a.languages[r],r)),l&&e.close()}),!1),a):a;var g=a.util.currentScript();function f(){a.manual||a.highlightAll()}if(g&&(a.filename=g.src,g.hasAttribute("data-manual")&&(a.manual=!0)),!a.manual){var h=document.readyState;"loading"===h||"interactive"===h&&g&&g.defer?document.addEventListener("DOMContentLoaded",f):window.requestAnimationFrame?window.requestAnimationFrame(f):window.setTimeout(f,16)}return a}(_self);"undefined"!=typeof module&&module.exports&&(module.exports=Prism),"undefined"!=typeof global&&(global.Prism=Prism); +Prism.languages.clike={comment:[{pattern:/(^|[^\\])\/\*[\s\S]*?(?:\*\/|$)/,lookbehind:!0,greedy:!0},{pattern:/(^|[^\\:])\/\/.*/,lookbehind:!0,greedy:!0}],string:{pattern:/(["'])(?:\\(?:\r\n|[\s\S])|(?!\1)[^\\\r\n])*\1/,greedy:!0},"class-name":{pattern:/(\b(?:class|extends|implements|instanceof|interface|new|trait)\s+|\bcatch\s+\()[\w.\\]+/i,lookbehind:!0,inside:{punctuation:/[.\\]/}},keyword:/\b(?:break|catch|continue|do|else|finally|for|function|if|in|instanceof|new|null|return|throw|try|while)\b/,boolean:/\b(?:false|true)\b/,function:/\b\w+(?=\()/,number:/\b0x[\da-f]+\b|(?:\b\d+(?:\.\d*)?|\B\.\d+)(?:e[+-]?\d+)?/i,operator:/[<>]=?|[!=]=?=?|--?|\+\+?|&&?|\|\|?|[?*/~^%]/,punctuation:/[{}[\];(),.:]/}; +!function(e){function n(e,n){return e.replace(/<<(\d+)>>/g,(function(e,s){return"(?:"+n[+s]+")"}))}function s(e,s,a){return RegExp(n(e,s),a||"")}function a(e,n){for(var s=0;s>/g,(function(){return"(?:"+e+")"}));return e.replace(/<>/g,"[^\\s\\S]")}var t="bool byte char decimal double dynamic float int long object sbyte short string uint ulong ushort var void",r="class enum interface record struct",i="add alias and ascending async await by descending from(?=\\s*(?:\\w|$)) get global group into init(?=\\s*;) join let nameof not notnull on or orderby partial remove select set unmanaged value when where with(?=\\s*{)",o="abstract as base break case catch checked const continue default delegate do else event explicit extern finally fixed for foreach goto if implicit in internal is lock namespace new null operator out override params private protected public readonly ref return sealed sizeof stackalloc static switch this throw try typeof unchecked unsafe using virtual volatile while yield";function l(e){return"\\b(?:"+e.trim().replace(/ /g,"|")+")\\b"}var d=l(r),p=RegExp(l(t+" "+r+" "+i+" "+o)),c=l(r+" "+i+" "+o),u=l(t+" "+r+" "+o),g=a("<(?:[^<>;=+\\-*/%&|^]|<>)*>",2),b=a("\\((?:[^()]|<>)*\\)",2),h="@?\\b[A-Za-z_]\\w*\\b",f=n("<<0>>(?:\\s*<<1>>)?",[h,g]),m=n("(?!<<0>>)<<1>>(?:\\s*\\.\\s*<<1>>)*",[c,f]),k="\\[\\s*(?:,\\s*)*\\]",y=n("<<0>>(?:\\s*(?:\\?\\s*)?<<1>>)*(?:\\s*\\?)?",[m,k]),w=n("[^,()<>[\\];=+\\-*/%&|^]|<<0>>|<<1>>|<<2>>",[g,b,k]),v=n("\\(<<0>>+(?:,<<0>>+)+\\)",[w]),x=n("(?:<<0>>|<<1>>)(?:\\s*(?:\\?\\s*)?<<2>>)*(?:\\s*\\?)?",[v,m,k]),$={keyword:p,punctuation:/[<>()?,.:[\]]/},_="'(?:[^\r\n'\\\\]|\\\\.|\\\\[Uux][\\da-fA-F]{1,8})'",B='"(?:\\\\.|[^\\\\"\r\n])*"';e.languages.csharp=e.languages.extend("clike",{string:[{pattern:s("(^|[^$\\\\])<<0>>",['@"(?:""|\\\\[^]|[^\\\\"])*"(?!")']),lookbehind:!0,greedy:!0},{pattern:s("(^|[^@$\\\\])<<0>>",[B]),lookbehind:!0,greedy:!0}],"class-name":[{pattern:s("(\\busing\\s+static\\s+)<<0>>(?=\\s*;)",[m]),lookbehind:!0,inside:$},{pattern:s("(\\busing\\s+<<0>>\\s*=\\s*)<<1>>(?=\\s*;)",[h,x]),lookbehind:!0,inside:$},{pattern:s("(\\busing\\s+)<<0>>(?=\\s*=)",[h]),lookbehind:!0},{pattern:s("(\\b<<0>>\\s+)<<1>>",[d,f]),lookbehind:!0,inside:$},{pattern:s("(\\bcatch\\s*\\(\\s*)<<0>>",[m]),lookbehind:!0,inside:$},{pattern:s("(\\bwhere\\s+)<<0>>",[h]),lookbehind:!0},{pattern:s("(\\b(?:is(?:\\s+not)?|as)\\s+)<<0>>",[y]),lookbehind:!0,inside:$},{pattern:s("\\b<<0>>(?=\\s+(?!<<1>>|with\\s*\\{)<<2>>(?:\\s*[=,;:{)\\]]|\\s+(?:in|when)\\b))",[x,u,h]),inside:$}],keyword:p,number:/(?:\b0(?:x[\da-f_]*[\da-f]|b[01_]*[01])|(?:\B\.\d+(?:_+\d+)*|\b\d+(?:_+\d+)*(?:\.\d+(?:_+\d+)*)?)(?:e[-+]?\d+(?:_+\d+)*)?)(?:[dflmu]|lu|ul)?\b/i,operator:/>>=?|<<=?|[-=]>|([-+&|])\1|~|\?\?=?|[-+*/%&|^!=<>]=?/,punctuation:/\?\.?|::|[{}[\];(),.:]/}),e.languages.insertBefore("csharp","number",{range:{pattern:/\.\./,alias:"operator"}}),e.languages.insertBefore("csharp","punctuation",{"named-parameter":{pattern:s("([(,]\\s*)<<0>>(?=\\s*:)",[h]),lookbehind:!0,alias:"punctuation"}}),e.languages.insertBefore("csharp","class-name",{namespace:{pattern:s("(\\b(?:namespace|using)\\s+)<<0>>(?:\\s*\\.\\s*<<0>>)*(?=\\s*[;{])",[h]),lookbehind:!0,inside:{punctuation:/\./}},"type-expression":{pattern:s("(\\b(?:default|sizeof|typeof)\\s*\\(\\s*(?!\\s))(?:[^()\\s]|\\s(?!\\s)|<<0>>)*(?=\\s*\\))",[b]),lookbehind:!0,alias:"class-name",inside:$},"return-type":{pattern:s("<<0>>(?=\\s+(?:<<1>>\\s*(?:=>|[({]|\\.\\s*this\\s*\\[)|this\\s*\\[))",[x,m]),inside:$,alias:"class-name"},"constructor-invocation":{pattern:s("(\\bnew\\s+)<<0>>(?=\\s*[[({])",[x]),lookbehind:!0,inside:$,alias:"class-name"},"generic-method":{pattern:s("<<0>>\\s*<<1>>(?=\\s*\\()",[h,g]),inside:{function:s("^<<0>>",[h]),generic:{pattern:RegExp(g),alias:"class-name",inside:$}}},"type-list":{pattern:s("\\b((?:<<0>>\\s+<<1>>|record\\s+<<1>>\\s*<<5>>|where\\s+<<2>>)\\s*:\\s*)(?:<<3>>|<<4>>|<<1>>\\s*<<5>>|<<6>>)(?:\\s*,\\s*(?:<<3>>|<<4>>|<<6>>))*(?=\\s*(?:where|[{;]|=>|$))",[d,f,h,x,p.source,b,"\\bnew\\s*\\(\\s*\\)"]),lookbehind:!0,inside:{"record-arguments":{pattern:s("(^(?!new\\s*\\()<<0>>\\s*)<<1>>",[f,b]),lookbehind:!0,greedy:!0,inside:e.languages.csharp},keyword:p,"class-name":{pattern:RegExp(x),greedy:!0,inside:$},punctuation:/[,()]/}},preprocessor:{pattern:/(^[\t ]*)#.*/m,lookbehind:!0,alias:"property",inside:{directive:{pattern:/(#)\b(?:define|elif|else|endif|endregion|error|if|line|nullable|pragma|region|undef|warning)\b/,lookbehind:!0,alias:"keyword"}}}});var E=B+"|"+_,R=n("/(?![*/])|//[^\r\n]*[\r\n]|/\\*(?:[^*]|\\*(?!/))*\\*/|<<0>>",[E]),z=a(n("[^\"'/()]|<<0>>|\\(<>*\\)",[R]),2),S="\\b(?:assembly|event|field|method|module|param|property|return|type)\\b",j=n("<<0>>(?:\\s*\\(<<1>>*\\))?",[m,z]);e.languages.insertBefore("csharp","class-name",{attribute:{pattern:s("((?:^|[^\\s\\w>)?])\\s*\\[\\s*)(?:<<0>>\\s*:\\s*)?<<1>>(?:\\s*,\\s*<<1>>)*(?=\\s*\\])",[S,j]),lookbehind:!0,greedy:!0,inside:{target:{pattern:s("^<<0>>(?=\\s*:)",[S]),alias:"keyword"},"attribute-arguments":{pattern:s("\\(<<0>>*\\)",[z]),inside:e.languages.csharp},"class-name":{pattern:RegExp(m),inside:{punctuation:/\./}},punctuation:/[:,]/}}});var A=":[^}\r\n]+",F=a(n("[^\"'/()]|<<0>>|\\(<>*\\)",[R]),2),P=n("\\{(?!\\{)(?:(?![}:])<<0>>)*<<1>>?\\}",[F,A]),U=a(n("[^\"'/()]|/(?!\\*)|/\\*(?:[^*]|\\*(?!/))*\\*/|<<0>>|\\(<>*\\)",[E]),2),Z=n("\\{(?!\\{)(?:(?![}:])<<0>>)*<<1>>?\\}",[U,A]);function q(n,a){return{interpolation:{pattern:s("((?:^|[^{])(?:\\{\\{)*)<<0>>",[n]),lookbehind:!0,inside:{"format-string":{pattern:s("(^\\{(?:(?![}:])<<0>>)*)<<1>>(?=\\}$)",[a,A]),lookbehind:!0,inside:{punctuation:/^:/}},punctuation:/^\{|\}$/,expression:{pattern:/[\s\S]+/,alias:"language-csharp",inside:e.languages.csharp}}},string:/[\s\S]+/}}e.languages.insertBefore("csharp","string",{"interpolation-string":[{pattern:s('(^|[^\\\\])(?:\\$@|@\\$)"(?:""|\\\\[^]|\\{\\{|<<0>>|[^\\\\{"])*"',[P]),lookbehind:!0,greedy:!0,inside:q(P,F)},{pattern:s('(^|[^@\\\\])\\$"(?:\\\\.|\\{\\{|<<0>>|[^\\\\"{])*"',[Z]),lookbehind:!0,greedy:!0,inside:q(Z,U)}],char:{pattern:RegExp(_),greedy:!0}}),e.languages.dotnet=e.languages.cs=e.languages.csharp}(Prism); diff --git a/src/Sheddueller.Dashboard/wwwroot/vendor/prism/sheddueller-prism.js b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/sheddueller-prism.js new file mode 100644 index 0000000..4776643 --- /dev/null +++ b/src/Sheddueller.Dashboard/wwwroot/vendor/prism/sheddueller-prism.js @@ -0,0 +1,9 @@ +window.ShedduellerDashboard = window.ShedduellerDashboard || {}; + +window.ShedduellerDashboard.highlightCode = function (element) { + if (!element || !window.Prism || typeof window.Prism.highlightElement !== "function") { + return; + } + + window.Prism.highlightElement(element); +}; diff --git a/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs b/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs index c058a6a..d59cac8 100644 --- a/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs +++ b/src/Sheddueller.Postgres/Internal/Operations/PostgresJobInspectionOperation.cs @@ -2,14 +2,23 @@ namespace Sheddueller.Postgres.Internal.Operations; using System.Globalization; using System.Runtime.CompilerServices; +using System.Text.Json; using Npgsql; using Sheddueller.Inspection.Jobs; +using Sheddueller.Serialization; using Sheddueller.Storage; internal static class PostgresJobInspectionOperation { + private const int SerializedArgumentDisplayByteLimit = 64 * 1024; + + private static readonly JsonSerializerOptions SerializedArgumentDisplayJsonOptions = new() + { + WriteIndented = true, + }; + public static async ValueTask GetOverviewAsync( PostgresOperationContext context, CancellationToken cancellationToken) @@ -134,6 +143,7 @@ await CreateSummaryAsync(context, connection, row, cancellationToken).ConfigureA row.LeaseExpiresAtUtc, row.ScheduledFireAtUtc) { + Invocation = await ReadInvocationAsync(context, connection, row, cancellationToken).ConfigureAwait(false), RetryCloneJobIds = await ReadRetryCloneJobIdsAsync(context, connection, jobId, cancellationToken).ConfigureAwait(false), }; } @@ -389,6 +399,139 @@ from claimable return rows.Count == 0 ? null : rows[0]; } + private static async ValueTask ReadInvocationAsync( + PostgresOperationContext context, + NpgsqlConnection connection, + PostgresJobInspectionRow row, + CancellationToken cancellationToken) + { + await using var command = connection.CreateCommand(); + command.CommandText = + $""" + select + method_parameter_types, + invocation_target_kind, + method_parameter_bindings, + serialized_arguments_content_type, + serialized_arguments + from {context.Names.Jobs} + where job_id = @job_id; + """; + command.Parameters.AddWithValue("job_id", row.JobId); + + await using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); + if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) + { + return null; + } + + var parameterTypes = reader.GetFieldValue(0); + var targetKind = PostgresConversion.ToInvocationTargetKind(reader.GetValue(1)); + var parameterBindings = JobMethodParameterBindingResolver.Normalize( + parameterTypes, + PostgresConversion.ToParameterBindings(reader.GetValue(2))); + var payload = PostgresConversion.ToPayload(reader.GetValue(3), reader.GetValue(4)); + + return CreateInvocation(row, targetKind, parameterTypes, parameterBindings, payload); + } + + private static JobInvocationInspection CreateInvocation( + PostgresJobInspectionRow row, + JobInvocationTargetKind targetKind, + string[] parameterTypes, + IReadOnlyList parameterBindings, + SerializedJobPayload payload) + { + var serializedValueJson = new string?[parameterTypes.Length]; + var status = PopulateSerializedValueJson(payload, parameterBindings, serializedValueJson, out var statusMessage); + var parameters = new JobInvocationParameterInspection[parameterTypes.Length]; + + for (var i = 0; i < parameterTypes.Length; i++) + { + parameters[i] = new JobInvocationParameterInspection( + i, + parameterTypes[i], + parameterBindings[i], + serializedValueJson[i]); + } + + var reconstructedCall = JobInvocationDisplayFormatter.Format(row.ServiceType, row.MethodName, parameters); + + return new JobInvocationInspection( + targetKind, + row.ServiceType, + row.MethodName, + reconstructedCall, + parameters, + payload.ContentType, + payload.Data.LongLength, + status, + statusMessage); + } + + private static JobSerializedArgumentsInspectionStatus PopulateSerializedValueJson( + SerializedJobPayload payload, + IReadOnlyList parameterBindings, + string?[] serializedValueJson, + out string? statusMessage) + { + if (!string.Equals(payload.ContentType, SystemTextJsonJobPayloadSerializer.JsonContentType, StringComparison.Ordinal)) + { + statusMessage = string.Create( + CultureInfo.InvariantCulture, + $"Serialized arguments use unsupported content type '{payload.ContentType}'."); + return JobSerializedArgumentsInspectionStatus.UnsupportedContentType; + } + + if (payload.Data.LongLength > SerializedArgumentDisplayByteLimit) + { + statusMessage = string.Create( + CultureInfo.InvariantCulture, + $"Serialized arguments are {payload.Data.LongLength:N0} bytes, exceeding the {SerializedArgumentDisplayByteLimit:N0} byte display limit."); + return JobSerializedArgumentsInspectionStatus.TooLarge; + } + + var serializedParameterIndexes = parameterBindings + .Select((binding, index) => (binding, index)) + .Where(parameter => parameter.binding.Kind == JobMethodParameterBindingKind.Serialized) + .Select(parameter => parameter.index) + .ToArray(); + + try + { + using var document = JsonDocument.Parse(payload.Data); + var root = document.RootElement; + if (root.ValueKind != JsonValueKind.Array) + { + statusMessage = "Serialized arguments payload is not a JSON array."; + return JobSerializedArgumentsInspectionStatus.InvalidPayload; + } + + if (root.GetArrayLength() != serializedParameterIndexes.Length) + { + statusMessage = string.Create( + CultureInfo.InvariantCulture, + $"Serialized arguments contain {root.GetArrayLength()} values for {serializedParameterIndexes.Length} serialized parameters."); + return JobSerializedArgumentsInspectionStatus.ArgumentCountMismatch; + } + + var argumentIndex = 0; + foreach (var element in root.EnumerateArray()) + { + serializedValueJson[serializedParameterIndexes[argumentIndex]] = JsonSerializer.Serialize(element, SerializedArgumentDisplayJsonOptions); + argumentIndex++; + } + + statusMessage = null; + return JobSerializedArgumentsInspectionStatus.Displayable; + } + catch (JsonException) + { + statusMessage = "Serialized arguments payload is not valid JSON."; + return JobSerializedArgumentsInspectionStatus.InvalidPayload; + } + } + private static async ValueTask> ReadRowsAsync( NpgsqlCommand command, CancellationToken cancellationToken) diff --git a/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs b/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs index 199a614..7a3be33 100644 --- a/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs +++ b/src/Sheddueller.Postgres/Internal/PostgresJobEventListener.cs @@ -2,6 +2,8 @@ namespace Sheddueller.Postgres.Internal; using System.Globalization; +using Microsoft.Extensions.Logging; + using Npgsql; using Sheddueller.Postgres.Internal.Operations; @@ -10,8 +12,11 @@ namespace Sheddueller.Postgres.Internal; internal sealed class PostgresJobEventListener( ShedduellerPostgresOptions options, - IJobEventNotifier publisher) : IShedduellerJobEventListener + IJobEventNotifier publisher, + ILogger logger) : IShedduellerJobEventListener { + private static readonly TimeSpan ListenerRetryDelay = TimeSpan.FromSeconds(1); + private readonly PostgresOperationContext _context = new(options); public async Task ListenAsync(CancellationToken cancellationToken) @@ -26,9 +31,10 @@ public async Task ListenAsync(CancellationToken cancellationToken) { return; } - catch (Exception) + catch (Exception exception) { - await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken).ConfigureAwait(false); + logger.PostgresJobEventListenerRetrying(exception, options.SchemaName, (long)ListenerRetryDelay.TotalMilliseconds); + await Task.Delay(ListenerRetryDelay, cancellationToken).ConfigureAwait(false); } } } @@ -43,6 +49,7 @@ private async Task ListenUntilDisconnectedAsync(CancellationToken cancellationTo await using var command = connection.CreateCommand(); command.CommandText = $"listen {PostgresNames.JobEventChannel};"; await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + logger.PostgresJobEventListenerStarted(options.SchemaName); while (!cancellationToken.IsCancellationRequested) { @@ -56,21 +63,37 @@ private async Task ListenUntilDisconnectedAsync(CancellationToken cancellationTo } private void OnNotification(object sender, NpgsqlNotificationEventArgs args) + => this.HandleNotificationPayload(args.Payload); + + internal void HandleNotificationPayload(string payload) { - if (!TryParsePayload(args.Payload, options.SchemaName, out var jobId, out var eventSequence)) + if (!TryParsePayload(payload, options.SchemaName, out var jobId, out var eventSequence)) { + logger.PostgresJobEventNotificationPayloadInvalid(options.SchemaName); return; } - _ = Task.Run(async () => + _ = Task.Run(() => this.PublishNotificationAsync(jobId, eventSequence)); + } + + private async Task PublishNotificationAsync(Guid jobId, long eventSequence) + { + try { var jobEvent = await PostgresJobEvents.ReadEventAsync(this._context, jobId, eventSequence, CancellationToken.None) .ConfigureAwait(false); - if (jobEvent is not null) + if (jobEvent is null) { - await publisher.NotifyAsync(jobEvent, CancellationToken.None).ConfigureAwait(false); + logger.PostgresJobEventNotificationMissing(jobId, eventSequence); + return; } - }); + + await publisher.NotifyAsync(jobEvent, CancellationToken.None).ConfigureAwait(false); + } + catch (Exception exception) + { + logger.PostgresJobEventNotificationFailed(exception, jobId, eventSequence); + } } private static bool TryParsePayload( diff --git a/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs b/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs index f98fa03..0442abb 100644 --- a/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs +++ b/src/Sheddueller.Postgres/Internal/PostgresWakeSignal.cs @@ -2,12 +2,18 @@ namespace Sheddueller.Postgres.Internal; +using Microsoft.Extensions.Logging; + using Npgsql; using Sheddueller.Runtime; -internal sealed class PostgresWakeSignal(ShedduellerPostgresOptions options) : IShedduellerWakeSignal, IDisposable +internal sealed class PostgresWakeSignal( + ShedduellerPostgresOptions options, + ILogger logger) : IShedduellerWakeSignal, IDisposable { + private static readonly TimeSpan ListenerRetryDelay = TimeSpan.FromSeconds(1); + private readonly SemaphoreSlim _signal = new(0); private readonly CancellationTokenSource _disposeTokenSource = new(); private readonly Lock _listenerLock = new(); @@ -60,9 +66,10 @@ private async Task ListenAsync() { return; } - catch (Exception) + catch (Exception exception) { - await Task.Delay(TimeSpan.FromSeconds(1), this._disposeTokenSource.Token).ConfigureAwait(false); + logger.PostgresWakeListenerRetrying(exception, this._options.SchemaName, (long)ListenerRetryDelay.TotalMilliseconds); + await Task.Delay(ListenerRetryDelay, this._disposeTokenSource.Token).ConfigureAwait(false); } } } @@ -77,6 +84,7 @@ private async Task ListenUntilDisconnectedAsync(CancellationToken cancellationTo await using var command = connection.CreateCommand(); command.CommandText = $"listen {PostgresNames.WakeupChannel};"; await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + logger.PostgresWakeListenerStarted(this._options.SchemaName); while (!cancellationToken.IsCancellationRequested) { diff --git a/src/Sheddueller.Postgres/Internal/ShedduellerPostgresLoggerMessages.cs b/src/Sheddueller.Postgres/Internal/ShedduellerPostgresLoggerMessages.cs new file mode 100644 index 0000000..5b3fedb --- /dev/null +++ b/src/Sheddueller.Postgres/Internal/ShedduellerPostgresLoggerMessages.cs @@ -0,0 +1,69 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerPostgresLoggerMessages +{ + private const int EventIdStart = 1200; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Debug, + "PostgreSQL wake listener started for schema {SchemaName}.")] + public static partial void PostgresWakeListenerStarted( + this ILogger logger, + string schemaName); + + [LoggerMessage( + EventIdStart + 1, + LogLevel.Warning, + "PostgreSQL wake listener for schema {SchemaName} disconnected; retrying in {RetryDelayMs:D} ms.")] + public static partial void PostgresWakeListenerRetrying( + this ILogger logger, + Exception exception, + string schemaName, + long retryDelayMs); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Debug, + "PostgreSQL job-event listener started for schema {SchemaName}.")] + public static partial void PostgresJobEventListenerStarted( + this ILogger logger, + string schemaName); + + [LoggerMessage( + EventIdStart + 11, + LogLevel.Warning, + "PostgreSQL job-event listener for schema {SchemaName} disconnected; retrying in {RetryDelayMs:D} ms.")] + public static partial void PostgresJobEventListenerRetrying( + this ILogger logger, + Exception exception, + string schemaName, + long retryDelayMs); + + [LoggerMessage( + EventIdStart + 12, + LogLevel.Debug, + "Ignored PostgreSQL job-event notification with invalid payload for schema {SchemaName}.")] + public static partial void PostgresJobEventNotificationPayloadInvalid( + this ILogger logger, + string schemaName); + + [LoggerMessage( + EventIdStart + 13, + LogLevel.Debug, + "PostgreSQL job event {EventSequence} for job {JobId} was not found after notification.")] + public static partial void PostgresJobEventNotificationMissing( + this ILogger logger, + Guid jobId, + long eventSequence); + + [LoggerMessage( + EventIdStart + 14, + LogLevel.Warning, + "Failed to publish PostgreSQL job event {EventSequence} for job {JobId}.")] + public static partial void PostgresJobEventNotificationFailed( + this ILogger logger, + Exception exception, + Guid jobId, + long eventSequence); +} diff --git a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs index 4773669..3eaf1f6 100644 --- a/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs +++ b/src/Sheddueller.Worker/Internal/ShedduellerWorker.cs @@ -6,6 +6,7 @@ namespace Sheddueller.Worker.Internal; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller; @@ -21,7 +22,9 @@ internal sealed class ShedduellerWorker( TimeProvider timeProvider, IShedduellerWakeSignal wakeSignal, IShedduellerNodeIdProvider nodeIdProvider, - IJobEventSink jobEventSink) : BackgroundService + IJobEventSink jobEventSink, + ILogger logger, + ILogger jobContextLogger) : BackgroundService { private readonly IServiceProvider _serviceProvider = serviceProvider; private readonly IServiceScopeFactory _scopeFactory = scopeFactory; @@ -30,11 +33,14 @@ internal sealed class ShedduellerWorker( private readonly IShedduellerWakeSignal _wakeSignal = wakeSignal; private readonly IShedduellerNodeIdProvider _nodeIdProvider = nodeIdProvider; private readonly IJobEventSink _jobEventSink = jobEventSink; + private readonly ILogger _logger = logger; + private readonly ILogger _jobContextLogger = jobContextLogger; private readonly ConcurrentDictionary _runningJobs = new(); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var store = this._serviceProvider.GetRequiredService(); + this._logger.WorkerStarted(this._nodeIdProvider.NodeId); try { @@ -57,6 +63,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } claimedJob = true; + this._logger.JobClaimed(claimed.Job.JobId, claimed.Job.AttemptCount, this._nodeIdProvider.NodeId); this.TrackRunningJob(this.ExecuteClaimedJobAsync(store, claimed.Job, stoppingToken)); } @@ -72,8 +79,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // Shutdown stops claiming. Running jobs are awaited below so terminal state can be recorded. } + catch (Exception exception) + { + this._logger.WorkerFailed(exception, this._nodeIdProvider.NodeId); + throw; + } await this.WaitForRunningJobsAsync().ConfigureAwait(false); + this._logger.WorkerStopped(this._nodeIdProvider.NodeId); } private async ValueTask WaitForWorkOrCapacityAsync(CancellationToken stoppingToken) @@ -104,29 +117,42 @@ private async Task ExecuteClaimedJobAsync(IJobStore store, ClaimedJob job, Cance try { await this.InvokeClaimedJobAsync(job, executionTokenSource.Token).ConfigureAwait(false); - await store + var completed = await store .MarkCompletedAsync(new CompleteJobRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow()), CancellationToken.None) .ConfigureAwait(false); + if (completed) + { + this._logger.JobCompleted(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); + } } catch (OperationCanceledException) when (executionTokenSource.IsCancellationRequested) { if (cancellationState.CancellationRequestedAtUtc is not null) { - await store + var observed = await store .MarkCancellationObservedAsync( new ObserveJobCancellationRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow()), CancellationToken.None) .ConfigureAwait(false); + if (observed) + { + this._logger.JobCancellationObserved(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); + } } else { - await store + var released = await store .ReleaseJobAsync(new ReleaseJobRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow()), CancellationToken.None) .ConfigureAwait(false); + if (released) + { + this._logger.JobReleased(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); + } } } catch (Exception exception) { + this._logger.JobFailed(exception, job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); await store .MarkFailedAsync( new FailJobRequest(job.JobId, this._nodeIdProvider.NodeId, job.LeaseToken, this._timeProvider.GetUtcNow(), CreateFailureInfo(exception)), @@ -136,7 +162,7 @@ await store finally { await executionTokenSource.CancelAsync().ConfigureAwait(false); - await WaitForHeartbeatTaskAsync(heartbeatTask).ConfigureAwait(false); + await this.WaitForHeartbeatTaskAsync(heartbeatTask, job).ConfigureAwait(false); executionTokenSource.Dispose(); } } @@ -149,7 +175,7 @@ private async ValueTask InvokeClaimedJobAsync(ClaimedJob job, CancellationToken var serializableParameterTypes = methodParameterTypes .Where((_, index) => parameterBindings[index].Kind == JobMethodParameterBindingKind.Serialized) .ToArray(); - var jobContext = new JobContext(job.JobId, job.AttemptCount, this._jobEventSink, executionToken); + var jobContext = new JobContext(job.JobId, job.AttemptCount, this._jobEventSink, this._jobContextLogger, executionToken); var scope = this._scopeFactory.CreateAsyncScope(); await using (scope.ConfigureAwait(false)) @@ -282,25 +308,7 @@ private static object ResolveBoundService( private static IReadOnlyList NormalizeParameterBindings( Type[] methodParameterTypes, IReadOnlyList? parameterBindings) - { - if (parameterBindings is { Count: > 0 }) - { - return parameterBindings; - } - - var inferred = new JobMethodParameterBinding[methodParameterTypes.Length]; - for (var i = 0; i < methodParameterTypes.Length; i++) - { - inferred[i] = methodParameterTypes[i] switch - { - Type type when type == typeof(CancellationToken) => new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), - Type type when type == typeof(IJobContext) => new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext), - _ => new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), - }; - } - - return inferred; - } + => JobMethodParameterBindingResolver.Normalize(methodParameterTypes, parameterBindings); private void TrackRunningJob(Task executionTask) { @@ -335,6 +343,7 @@ await store if (recovered > 0 || materialized > 0) { this._wakeSignal.Notify(); + this._logger.WorkerPeriodicStoreWorkCompleted(this._nodeIdProvider.NodeId, recovered, materialized); } } @@ -359,6 +368,7 @@ private async Task RenewLeaseUntilStoppedAsync( if (!renewed) { + this._logger.JobLeaseRenewalLost(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); await executionTokenSource.CancelAsync().ConfigureAwait(false); return; } @@ -371,6 +381,7 @@ private async Task RenewLeaseUntilStoppedAsync( if (cancellationRequestedAtUtc is not null && cancellationState.CancellationRequestedAtUtc is null) { cancellationState.CancellationRequestedAtUtc = cancellationRequestedAtUtc; + this._logger.JobCancellationRequestedObserved(job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); await executionTokenSource.CancelAsync().ConfigureAwait(false); return; } @@ -383,14 +394,15 @@ private async Task RenewLeaseUntilStoppedAsync( } [SuppressMessage("Design", "CA1031:Do not catch general exception types", Justification = "Heartbeat failures should not fault the job execution cleanup path.")] - private static async ValueTask WaitForHeartbeatTaskAsync(Task heartbeatTask) + private async ValueTask WaitForHeartbeatTaskAsync(Task heartbeatTask, ClaimedJob job) { try { await heartbeatTask.ConfigureAwait(false); } - catch + catch (Exception exception) { + this._logger.JobHeartbeatFailed(exception, job.JobId, job.AttemptCount, this._nodeIdProvider.NodeId); // Failure to renew will be handled by lease expiry recovery. } } diff --git a/src/Sheddueller.Worker/Internal/ShedduellerWorkerLoggerMessages.cs b/src/Sheddueller.Worker/Internal/ShedduellerWorkerLoggerMessages.cs new file mode 100644 index 0000000..fdb43c1 --- /dev/null +++ b/src/Sheddueller.Worker/Internal/ShedduellerWorkerLoggerMessages.cs @@ -0,0 +1,123 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerWorkerLoggerMessages +{ + private const int EventIdStart = 1100; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Information, + "Sheddueller worker node {NodeId} started.")] + public static partial void WorkerStarted( + this ILogger logger, + string nodeId); + + [LoggerMessage( + EventIdStart + 1, + LogLevel.Information, + "Sheddueller worker node {NodeId} stopped.")] + public static partial void WorkerStopped( + this ILogger logger, + string nodeId); + + [LoggerMessage( + EventIdStart + 2, + LogLevel.Error, + "Sheddueller worker node {NodeId} stopped unexpectedly.")] + public static partial void WorkerFailed( + this ILogger logger, + Exception exception, + string nodeId); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Debug, + "Claimed job {JobId} for attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobClaimed( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 20, + LogLevel.Debug, + "Completed job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobCompleted( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 21, + LogLevel.Error, + "Job {JobId} attempt {AttemptNumber} failed on node {NodeId}.")] + public static partial void JobFailed( + this ILogger logger, + Exception exception, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 30, + LogLevel.Information, + "Cancellation was observed for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobCancellationObserved( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 31, + LogLevel.Warning, + "Released job {JobId} attempt {AttemptNumber} on node {NodeId} before completion.")] + public static partial void JobReleased( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 32, + LogLevel.Warning, + "Lost lease for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobLeaseRenewalLost( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 33, + LogLevel.Information, + "Cancellation was requested for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobCancellationRequestedObserved( + this ILogger logger, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 34, + LogLevel.Warning, + "Heartbeat task failed for job {JobId} attempt {AttemptNumber} on node {NodeId}.")] + public static partial void JobHeartbeatFailed( + this ILogger logger, + Exception exception, + Guid jobId, + int attemptNumber, + string nodeId); + + [LoggerMessage( + EventIdStart + 40, + LogLevel.Information, + "Worker node {NodeId} recovered {RecoveredCount} expired leases and materialized {MaterializedCount} recurring schedule occurrences.")] + public static partial void WorkerPeriodicStoreWorkCompleted( + this ILogger logger, + string nodeId, + int recoveredCount, + int materializedCount); +} diff --git a/src/Sheddueller/Enqueueing/JobEnqueuer.cs b/src/Sheddueller/Enqueueing/JobEnqueuer.cs index b1328e6..c081b07 100644 --- a/src/Sheddueller/Enqueueing/JobEnqueuer.cs +++ b/src/Sheddueller/Enqueueing/JobEnqueuer.cs @@ -2,6 +2,7 @@ namespace Sheddueller.Enqueueing; using System.Linq.Expressions; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Runtime; @@ -13,7 +14,8 @@ internal sealed class JobEnqueuer( IJobPayloadSerializer serializer, IOptions options, TimeProvider timeProvider, - IShedduellerWakeSignal wakeSignal) : IJobEnqueuer + IShedduellerWakeSignal wakeSignal, + ILogger logger) : IJobEnqueuer { public ValueTask EnqueueAsync( Expression> work, @@ -73,11 +75,14 @@ public async ValueTask> EnqueueManyAsync( throw new InvalidOperationException("The job store returned a result count that does not match the submitted batch size."); } - if (results.Any(result => result.WasEnqueued)) + var enqueuedCount = results.Count(result => result.WasEnqueued); + if (enqueuedCount > 0) { wakeSignal.Notify(); } + logger.JobsBatchEnqueued(requests.Length, enqueuedCount); + var jobIds = new Guid[results.Count]; for (var i = 0; i < results.Count; i++) { @@ -102,6 +107,11 @@ private async ValueTask EnqueueCoreAsync( if (result.WasEnqueued) { wakeSignal.Notify(); + logger.JobEnqueued(result.JobId, result.EnqueueSequence); + } + else + { + logger.JobEnqueueDeduplicated(result.JobId, result.EnqueueSequence); } return result.JobId; diff --git a/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs b/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs index c87c678..9d9e9e9 100644 --- a/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs +++ b/src/Sheddueller/Inspection/Jobs/JobInspectionDetail.cs @@ -10,6 +10,11 @@ public sealed record JobInspectionDetail( DateTimeOffset? LeaseExpiresAtUtc, DateTimeOffset? ScheduledFireAtUtc) { + /// + /// Reconstructed persisted invocation metadata for this job. + /// + public JobInvocationInspection? Invocation { get; init; } + /// /// Jobs cloned from this failed job. /// diff --git a/src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs b/src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs new file mode 100644 index 0000000..5522c8e --- /dev/null +++ b/src/Sheddueller/Inspection/Jobs/JobInvocationDisplayFormatter.cs @@ -0,0 +1,132 @@ +namespace Sheddueller.Inspection.Jobs; + +using System.Globalization; +using System.Text; +using System.Text.Json; + +using Sheddueller.Storage; + +internal static class JobInvocationDisplayFormatter +{ + public static string Format( + string serviceType, + string methodName, + IReadOnlyList parameters) + { + ArgumentException.ThrowIfNullOrWhiteSpace(serviceType); + ArgumentException.ThrowIfNullOrWhiteSpace(methodName); + ArgumentNullException.ThrowIfNull(parameters); + + var handler = string.Create(CultureInfo.InvariantCulture, $"{ShortTypeName(serviceType)}.{methodName}"); + if (parameters.Count == 0) + { + return string.Concat(handler, "()"); + } + + if (parameters.Count <= 2) + { + return string.Create( + CultureInfo.InvariantCulture, + $"{handler}({string.Join(", ", parameters.Select(FormatArgument))})"); + } + + var builder = new StringBuilder(); + builder.Append(handler); + builder.AppendLine("("); + + for (var i = 0; i < parameters.Count; i++) + { + builder.Append(" "); + builder.Append(FormatArgument(parameters[i])); + if (i < parameters.Count - 1) + { + builder.Append(','); + builder.AppendLine(); + } + } + + builder.Append(')'); + return builder.ToString(); + } + + private static string FormatArgument(JobInvocationParameterInspection parameter) + => parameter.Binding.Kind switch + { + JobMethodParameterBindingKind.Serialized => FormatSerializedArgument(parameter), + JobMethodParameterBindingKind.CancellationToken => "CancellationToken", + JobMethodParameterBindingKind.JobContext => "Job.Context", + JobMethodParameterBindingKind.Service => string.Create( + CultureInfo.InvariantCulture, + $"Job.Resolve<{ShortTypeName(parameter.Binding.ServiceType ?? parameter.ParameterType)}>()"), + _ => string.Create(CultureInfo.InvariantCulture, $"<{parameter.Binding.Kind}>"), + }; + + private static string FormatSerializedArgument(JobInvocationParameterInspection parameter) + { + if (parameter.SerializedValueJson is null) + { + return string.Create(CultureInfo.InvariantCulture, $""); + } + + try + { + using var document = JsonDocument.Parse(parameter.SerializedValueJson); + return document.RootElement.ValueKind switch + { + JsonValueKind.String => FormatStringLiteral(document.RootElement.GetString() ?? string.Empty), + JsonValueKind.Number or JsonValueKind.True or JsonValueKind.False or JsonValueKind.Null => document.RootElement.GetRawText(), + JsonValueKind.Object or JsonValueKind.Array => JsonSerializer.Serialize(document.RootElement), + _ => string.Create(CultureInfo.InvariantCulture, $""), + }; + } + catch (JsonException) + { + return string.Create(CultureInfo.InvariantCulture, $""); + } + } + + private static string FormatStringLiteral(string value) + { + var builder = new StringBuilder(value.Length + 2); + builder.Append('"'); + + foreach (var character in value) + { + _ = character switch + { + '"' => builder.Append("\\\""), + '\\' => builder.Append("\\\\"), + '\0' => builder.Append("\\0"), + '\a' => builder.Append("\\a"), + '\b' => builder.Append("\\b"), + '\f' => builder.Append("\\f"), + '\n' => builder.Append("\\n"), + '\r' => builder.Append("\\r"), + '\t' => builder.Append("\\t"), + '\v' => builder.Append("\\v"), + _ when char.IsControl(character) => builder.Append( + string.Create(CultureInfo.InvariantCulture, $"\\u{(int)character:X4}")), + _ => builder.Append(character), + }; + } + + builder.Append('"'); + return builder.ToString(); + } + + private static string ShortTypeName(string typeName) + { + var typeDelimiterIndex = typeName.IndexOf(',', StringComparison.Ordinal); + if (typeDelimiterIndex >= 0) + { + typeName = typeName[..typeDelimiterIndex]; + } + + var separatorIndex = Math.Max( + typeName.LastIndexOf('.'), + typeName.LastIndexOf('+')); + return separatorIndex < 0 || separatorIndex == typeName.Length - 1 + ? typeName + : typeName[(separatorIndex + 1)..]; + } +} diff --git a/src/Sheddueller/Inspection/Jobs/JobInvocationInspection.cs b/src/Sheddueller/Inspection/Jobs/JobInvocationInspection.cs new file mode 100644 index 0000000..7e781aa --- /dev/null +++ b/src/Sheddueller/Inspection/Jobs/JobInvocationInspection.cs @@ -0,0 +1,17 @@ +namespace Sheddueller.Inspection.Jobs; + +using Sheddueller.Storage; + +/// +/// Reconstructed persisted invocation metadata for a job. +/// +public sealed record JobInvocationInspection( + JobInvocationTargetKind TargetKind, + string ServiceType, + string MethodName, + string ReconstructedCall, + IReadOnlyList Parameters, + string SerializedArgumentsContentType, + long SerializedArgumentsByteCount, + JobSerializedArgumentsInspectionStatus SerializedArgumentsStatus, + string? SerializedArgumentsStatusMessage = null); diff --git a/src/Sheddueller/Inspection/Jobs/JobInvocationParameterInspection.cs b/src/Sheddueller/Inspection/Jobs/JobInvocationParameterInspection.cs new file mode 100644 index 0000000..c1f1bc3 --- /dev/null +++ b/src/Sheddueller/Inspection/Jobs/JobInvocationParameterInspection.cs @@ -0,0 +1,12 @@ +namespace Sheddueller.Inspection.Jobs; + +using Sheddueller.Storage; + +/// +/// Reconstructed persisted invocation metadata for one job method parameter. +/// +public sealed record JobInvocationParameterInspection( + int ParameterIndex, + string ParameterType, + JobMethodParameterBinding Binding, + string? SerializedValueJson = null); diff --git a/src/Sheddueller/Inspection/Jobs/JobSerializedArgumentsInspectionStatus.cs b/src/Sheddueller/Inspection/Jobs/JobSerializedArgumentsInspectionStatus.cs new file mode 100644 index 0000000..a0bd23d --- /dev/null +++ b/src/Sheddueller/Inspection/Jobs/JobSerializedArgumentsInspectionStatus.cs @@ -0,0 +1,32 @@ +namespace Sheddueller.Inspection.Jobs; + +/// +/// Describes whether persisted serialized job arguments can be displayed for inspection. +/// +public enum JobSerializedArgumentsInspectionStatus +{ + /// + /// The serialized arguments were parsed and mapped to invocation parameters. + /// + Displayable = 0, + + /// + /// The payload content type is not understood by the built-in inspection renderer. + /// + UnsupportedContentType = 1, + + /// + /// The payload exceeds the inspection display limit. + /// + TooLarge = 2, + + /// + /// The payload was expected to be displayable but could not be parsed. + /// + InvalidPayload = 3, + + /// + /// The payload argument count does not match the reconstructed invocation. + /// + ArgumentCountMismatch = 4, +} diff --git a/src/Sheddueller/Logging/ShedduellerLoggerMessages.cs b/src/Sheddueller/Logging/ShedduellerLoggerMessages.cs new file mode 100644 index 0000000..b38f86b --- /dev/null +++ b/src/Sheddueller/Logging/ShedduellerLoggerMessages.cs @@ -0,0 +1,105 @@ +namespace Microsoft.Extensions.Logging; + +internal static partial class ShedduellerLoggerMessages +{ + private const int EventIdStart = 1000; + + [LoggerMessage( + EventIdStart + 0, + LogLevel.Debug, + "Enqueued job {JobId} with sequence {EnqueueSequence}.")] + public static partial void JobEnqueued( + this ILogger logger, + Guid jobId, + long enqueueSequence); + + [LoggerMessage( + EventIdStart + 1, + LogLevel.Debug, + "Reused existing idempotent job {JobId} with sequence {EnqueueSequence}.")] + public static partial void JobEnqueueDeduplicated( + this ILogger logger, + Guid jobId, + long enqueueSequence); + + [LoggerMessage( + EventIdStart + 2, + LogLevel.Debug, + "Submitted {SubmittedCount} jobs and enqueued {EnqueuedCount} new jobs.")] + public static partial void JobsBatchEnqueued( + this ILogger logger, + int submittedCount, + int enqueuedCount); + + [LoggerMessage( + EventIdStart + 10, + LogLevel.Debug, + "Cancel request for job {JobId} returned {Result}.")] + public static partial void JobCancellationRequested( + this ILogger logger, + Guid jobId, + string result); + + [LoggerMessage( + EventIdStart + 20, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} upsert returned {Result}.")] + public static partial void RecurringScheduleUpserted( + this ILogger logger, + string scheduleKey, + string result); + + [LoggerMessage( + EventIdStart + 21, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} trigger returned {Status}.")] + public static partial void RecurringScheduleTriggered( + this ILogger logger, + string scheduleKey, + string status); + + [LoggerMessage( + EventIdStart + 22, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} delete returned {Deleted}.")] + public static partial void RecurringScheduleDeleted( + this ILogger logger, + string scheduleKey, + bool deleted); + + [LoggerMessage( + EventIdStart + 23, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} pause returned {Paused}.")] + public static partial void RecurringSchedulePaused( + this ILogger logger, + string scheduleKey, + bool paused); + + [LoggerMessage( + EventIdStart + 24, + LogLevel.Debug, + "Recurring schedule {ScheduleKey} resume returned {Resumed}.")] + public static partial void RecurringScheduleResumed( + this ILogger logger, + string scheduleKey, + bool resumed); + + [LoggerMessage( + EventIdStart + 30, + LogLevel.Debug, + "Set concurrency group {GroupKey} limit to {Limit}.")] + public static partial void ConcurrencyGroupLimitSet( + this ILogger logger, + string groupKey, + int limit); + + [LoggerMessage( + EventIdStart + 40, + LogLevel.Warning, + "Failed to append durable job event for job {JobId}.")] + public static partial void JobEventAppendFailed( + this ILogger logger, + Exception exception, + Guid jobId); +} diff --git a/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs b/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs index 497df80..b8f46cd 100644 --- a/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs +++ b/src/Sheddueller/Runtime/ConcurrencyGroupManager.cs @@ -1,9 +1,15 @@ namespace Sheddueller.Runtime; +using Microsoft.Extensions.Logging; + using Sheddueller.Enqueueing; using Sheddueller.Storage; -internal sealed class ConcurrencyGroupManager(IJobStore store, TimeProvider timeProvider, IShedduellerWakeSignal wakeSignal) : IConcurrencyGroupManager +internal sealed class ConcurrencyGroupManager( + IJobStore store, + TimeProvider timeProvider, + IShedduellerWakeSignal wakeSignal, + ILogger logger) : IConcurrencyGroupManager { public async ValueTask SetLimitAsync(string groupKey, int limit, CancellationToken cancellationToken = default) { @@ -18,6 +24,7 @@ await store .SetConcurrencyLimitAsync(new SetConcurrencyLimitRequest(groupKey, limit, timeProvider.GetUtcNow()), cancellationToken) .ConfigureAwait(false); wakeSignal.Notify(); + logger.ConcurrencyGroupLimitSet(groupKey, limit); } public ValueTask GetConfiguredLimitAsync(string groupKey, CancellationToken cancellationToken = default) diff --git a/src/Sheddueller/Runtime/JobContext.cs b/src/Sheddueller/Runtime/JobContext.cs index 9c9592c..4b58e32 100644 --- a/src/Sheddueller/Runtime/JobContext.cs +++ b/src/Sheddueller/Runtime/JobContext.cs @@ -2,12 +2,15 @@ namespace Sheddueller.Runtime; using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Logging; + using Sheddueller.Storage; internal sealed class JobContext( Guid jobId, int attemptNumber, IJobEventSink eventSink, + ILogger logger, CancellationToken cancellationToken) : IJobContext { public Guid JobId { get; } = jobId; @@ -65,8 +68,9 @@ private async ValueTask AppendBestEffortAsync( { throw; } - catch (Exception) + catch (Exception exception) { + logger.JobEventAppendFailed(exception, request.JobId); // Best-effort telemetry must not fail the owning job. } } diff --git a/src/Sheddueller/Runtime/JobManager.cs b/src/Sheddueller/Runtime/JobManager.cs index b102c02..f318000 100644 --- a/src/Sheddueller/Runtime/JobManager.cs +++ b/src/Sheddueller/Runtime/JobManager.cs @@ -1,11 +1,20 @@ namespace Sheddueller.Runtime; +using Microsoft.Extensions.Logging; + using Sheddueller.Storage; internal sealed class JobManager( IJobStore store, - TimeProvider timeProvider) : IJobManager + TimeProvider timeProvider, + ILogger logger) : IJobManager { - public ValueTask CancelAsync(Guid jobId, CancellationToken cancellationToken = default) - => store.CancelAsync(new CancelJobRequest(jobId, timeProvider.GetUtcNow()), cancellationToken); + public async ValueTask CancelAsync(Guid jobId, CancellationToken cancellationToken = default) + { + var result = await store.CancelAsync(new CancelJobRequest(jobId, timeProvider.GetUtcNow()), cancellationToken) + .ConfigureAwait(false); + logger.JobCancellationRequested(jobId, result.ToString()); + + return result; + } } diff --git a/src/Sheddueller/Runtime/RecurringScheduleManager.cs b/src/Sheddueller/Runtime/RecurringScheduleManager.cs index 3b87387..8704988 100644 --- a/src/Sheddueller/Runtime/RecurringScheduleManager.cs +++ b/src/Sheddueller/Runtime/RecurringScheduleManager.cs @@ -3,6 +3,7 @@ namespace Sheddueller.Runtime; using System.Linq.Expressions; using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Enqueueing; @@ -15,7 +16,8 @@ internal sealed class RecurringScheduleManager( IJobPayloadSerializer serializer, IOptions options, TimeProvider timeProvider, - IShedduellerWakeSignal wakeSignal) : IRecurringScheduleManager + IShedduellerWakeSignal wakeSignal, + ILogger logger) : IRecurringScheduleManager { public ValueTask CreateOrUpdateAsync( string scheduleKey, @@ -65,25 +67,36 @@ public async ValueTask TriggerAsync( wakeSignal.Notify(); } + logger.RecurringScheduleTriggered(scheduleKey, result.Status.ToString()); + return result; } - public ValueTask DeleteAsync(string scheduleKey, CancellationToken cancellationToken = default) + public async ValueTask DeleteAsync(string scheduleKey, CancellationToken cancellationToken = default) { SubmissionValidator.ValidateScheduleKey(scheduleKey); - return store.DeleteRecurringScheduleAsync(scheduleKey, cancellationToken); + var deleted = await store.DeleteRecurringScheduleAsync(scheduleKey, cancellationToken).ConfigureAwait(false); + logger.RecurringScheduleDeleted(scheduleKey, deleted); + + return deleted; } - public ValueTask PauseAsync(string scheduleKey, CancellationToken cancellationToken = default) + public async ValueTask PauseAsync(string scheduleKey, CancellationToken cancellationToken = default) { SubmissionValidator.ValidateScheduleKey(scheduleKey); - return store.PauseRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken); + var paused = await store.PauseRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false); + logger.RecurringSchedulePaused(scheduleKey, paused); + + return paused; } - public ValueTask ResumeAsync(string scheduleKey, CancellationToken cancellationToken = default) + public async ValueTask ResumeAsync(string scheduleKey, CancellationToken cancellationToken = default) { SubmissionValidator.ValidateScheduleKey(scheduleKey); - return store.ResumeRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken); + var resumed = await store.ResumeRecurringScheduleAsync(scheduleKey, timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false); + logger.RecurringScheduleResumed(scheduleKey, resumed); + + return resumed; } public ValueTask GetAsync(string scheduleKey, CancellationToken cancellationToken = default) @@ -175,6 +188,7 @@ private async ValueTask CreateOrUpdateCoreAsync( var result = await store.CreateOrUpdateRecurringScheduleAsync(request, cancellationToken).ConfigureAwait(false); wakeSignal.Notify(); + logger.RecurringScheduleUpserted(scheduleKey, result.ToString()); return result; } diff --git a/src/Sheddueller/Sheddueller.csproj b/src/Sheddueller/Sheddueller.csproj index 80045b6..114be10 100644 --- a/src/Sheddueller/Sheddueller.csproj +++ b/src/Sheddueller/Sheddueller.csproj @@ -2,6 +2,7 @@ + all diff --git a/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs b/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs index e3c623d..d0c1ef9 100644 --- a/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs +++ b/src/Sheddueller/ShedduellerServiceCollectionExtensions.cs @@ -27,6 +27,7 @@ public static IServiceCollection AddSheddueller( { ArgumentNullException.ThrowIfNull(services); + services.AddLogging(); services.AddOptions(); services.TryAddSingleton(TimeProvider.System); services.TryAddSingleton(); diff --git a/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs b/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs new file mode 100644 index 0000000..b33d533 --- /dev/null +++ b/src/Sheddueller/Storage/JobMethodParameterBindingResolver.cs @@ -0,0 +1,59 @@ +namespace Sheddueller.Storage; + +internal static class JobMethodParameterBindingResolver +{ + public static IReadOnlyList Normalize( + IReadOnlyList methodParameterTypes, + IReadOnlyList? parameterBindings) + { + ArgumentNullException.ThrowIfNull(methodParameterTypes); + + if (parameterBindings is { Count: > 0 }) + { + return parameterBindings; + } + + var inferred = new JobMethodParameterBinding[methodParameterTypes.Count]; + for (var i = 0; i < methodParameterTypes.Count; i++) + { + inferred[i] = CreateInferredBinding(methodParameterTypes[i]); + } + + return inferred; + } + + public static IReadOnlyList Normalize( + IReadOnlyList methodParameterTypes, + IReadOnlyList? parameterBindings) + { + ArgumentNullException.ThrowIfNull(methodParameterTypes); + + if (parameterBindings is { Count: > 0 }) + { + return parameterBindings; + } + + var inferred = new JobMethodParameterBinding[methodParameterTypes.Count]; + for (var i = 0; i < methodParameterTypes.Count; i++) + { + inferred[i] = CreateInferredBinding(methodParameterTypes[i]); + } + + return inferred; + } + + private static JobMethodParameterBinding CreateInferredBinding(Type parameterType) + => parameterType switch + { + Type type when type == typeof(CancellationToken) => new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + Type type when type == typeof(IJobContext) => new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext), + _ => new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + }; + + private static JobMethodParameterBinding CreateInferredBinding(string parameterType) + => string.Equals(parameterType, typeof(CancellationToken).AssemblyQualifiedName, StringComparison.Ordinal) + ? new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken) + : string.Equals(parameterType, typeof(IJobContext).AssemblyQualifiedName, StringComparison.Ordinal) + ? new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext) + : new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized); +} diff --git a/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs b/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs index 42f5c75..df0a2b3 100644 --- a/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs +++ b/test/Sheddueller.Dashboard.Tests/DashboardEndpointTests.cs @@ -16,6 +16,7 @@ namespace Sheddueller.Dashboard.Tests; using Sheddueller.Inspection.Metrics; using Sheddueller.Inspection.Nodes; using Sheddueller.Inspection.Schedules; +using Sheddueller.Serialization; using Sheddueller.Storage; using Shouldly; @@ -307,6 +308,13 @@ public async Task JobDetail_KnownJob_RendersDetailAndDefaultLogFilter() html.ShouldContain("href=\"jobs?handler=StubService.Run\""); html.ShouldContain("href=\"jobs?tag=tenant%3Aacme\""); html.ShouldContain("href=\"jobs?group=tenant-acme\""); + html.ShouldContain("Invocation"); + html.ShouldContain("StubService.Run("); + html.ShouldContain("permanent-failure"); + html.ShouldContain("Job.Resolve"); + html.ShouldContain("StubDependency"); + html.ShouldContain("Job.Context"); + html.ShouldContain("scheduler-owned"); html.ShouldContain("Lifecycle Timeline"); html.ShouldContain("Attempt 1 Failed"); html.ShouldContain("ConnectionTimeoutError: db-primary cluster unreachable"); @@ -525,6 +533,39 @@ private sealed class StubJobInspectionReader : IJobInspectionReader private static readonly DateTimeOffset CompletedAtUtc = DateTimeOffset.Parse("2026-04-20T12:09:00Z", CultureInfo.InvariantCulture); private static readonly DateTimeOffset CanceledAtUtc = DateTimeOffset.Parse("2026-04-20T12:02:00Z", CultureInfo.InvariantCulture); private static readonly DateTimeOffset LeaseExpiresAtUtc = DateTimeOffset.Parse("2026-04-20T12:10:00Z", CultureInfo.InvariantCulture); + private static readonly JobInvocationInspection Invocation = new( + JobInvocationTargetKind.Instance, + "Sheddueller.Dashboard.Tests.DashboardEndpointTests.StubService", + "Run", + string.Join( + Environment.NewLine, + "StubService.Run(", + " \"permanent-failure\",", + " Job.Resolve(),", + " Job.Context,", + " CancellationToken)"), + [ + new JobInvocationParameterInspection( + 0, + typeof(string).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + "\"permanent-failure\""), + new JobInvocationParameterInspection( + 1, + "Sheddueller.Dashboard.Tests.DashboardEndpointTests.StubDependency", + new JobMethodParameterBinding(JobMethodParameterBindingKind.Service, "Sheddueller.Dashboard.Tests.DashboardEndpointTests.StubDependency")), + new JobInvocationParameterInspection( + 2, + typeof(IJobContext).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext)), + new JobInvocationParameterInspection( + 3, + typeof(CancellationToken).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken)), + ], + SystemTextJsonJobPayloadSerializer.JsonContentType, + SerializedArgumentsByteCount: 31, + JobSerializedArgumentsInspectionStatus.Displayable); private static readonly JobInspectionSummary Job = new( JobId, @@ -711,7 +752,7 @@ private static async IAsyncEnumerable ReadEventsCoreAsync( } private static JobInspectionDetail CreateDetail(JobInspectionSummary job) - => job.State == JobState.Queued + => (job.State == JobState.Queued ? new JobInspectionDetail( job, ClaimedAtUtc: null, @@ -723,7 +764,10 @@ private static JobInspectionDetail CreateDetail(JobInspectionSummary job) ClaimedAtUtc, ClaimedByNodeId: "worker-us-east-1a-04", job.State == JobState.Claimed ? LeaseExpiresAtUtc : null, - ScheduledFireAtUtc: null); + ScheduledFireAtUtc: null)) with + { + Invocation = Invocation, + }; } private sealed class StubJobManager : IJobManager diff --git a/test/Sheddueller.Dashboard.Tests/JobEventRetentionServiceLoggingTests.cs b/test/Sheddueller.Dashboard.Tests/JobEventRetentionServiceLoggingTests.cs new file mode 100644 index 0000000..22198b3 --- /dev/null +++ b/test/Sheddueller.Dashboard.Tests/JobEventRetentionServiceLoggingTests.cs @@ -0,0 +1,55 @@ +namespace Sheddueller.Dashboard.Tests; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +using Sheddueller.Dashboard; +using Sheddueller.Dashboard.Internal; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; + +using Shouldly; + +public sealed class JobEventRetentionServiceLoggingTests +{ + [Fact] + public async Task Cleanup_NonZeroDeletedCount_LogsCleanupCount() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var store = new RecordingRetentionStore(3); + var services = new ServiceCollection(); + services.AddSingleton(store); + using var serviceProvider = services.BuildServiceProvider(); + using var logs = new TestLoggerProvider(); + using var loggerFactory = LoggerFactory.Create(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + using var service = new JobEventRetentionService( + serviceProvider, + Options.Create(new ShedduellerDashboardOptions { EventRetention = TimeSpan.FromDays(1) }), + loggerFactory.CreateLogger()); + + await service.StartAsync(cancellationTokenSource.Token); + await store.CleanupCalled.Task.WaitAsync(cancellationTokenSource.Token); + await service.StopAsync(cancellationTokenSource.Token); + + var entry = logs.SingleByEventId(1321); + entry.Level.ShouldBe(LogLevel.Information); + entry.Properties["DeletedCount"].ShouldBe(3); + entry.MessageTemplate.ShouldBe("Dashboard job-event retention cleanup deleted {DeletedCount} events."); + } + + private sealed class RecordingRetentionStore(int deletedCount) : IJobEventRetentionStore + { + public TaskCompletionSource CleanupCalled { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public ValueTask CleanupAsync( + TimeSpan retention, + CancellationToken cancellationToken = default) + { + this.CleanupCalled.TrySetResult(); + return ValueTask.FromResult(deletedCount); + } + } +} diff --git a/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj b/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj index 83ec564..bc871f6 100644 --- a/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj +++ b/test/Sheddueller.Dashboard.Tests/Sheddueller.Dashboard.Tests.csproj @@ -7,4 +7,8 @@ + + + + diff --git a/test/Sheddueller.Postgres.Tests/PostgresJobEventListenerLoggingTests.cs b/test/Sheddueller.Postgres.Tests/PostgresJobEventListenerLoggingTests.cs new file mode 100644 index 0000000..86f01a7 --- /dev/null +++ b/test/Sheddueller.Postgres.Tests/PostgresJobEventListenerLoggingTests.cs @@ -0,0 +1,44 @@ +namespace Sheddueller.Postgres.Tests; + +using Microsoft.Extensions.Logging; + +using Sheddueller.Postgres.Internal; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; + +using Shouldly; + +public sealed class PostgresJobEventListenerLoggingTests +{ + [Fact] + public void Notification_InvalidPayload_LogsIgnoredPayload() + { + using var logs = new TestLoggerProvider(); + using var loggerFactory = LoggerFactory.Create(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + var listener = new PostgresJobEventListener( + new ShedduellerPostgresOptions + { + DataSource = null!, + SchemaName = "sheddueller", + }, + new NoOpJobEventNotifier(), + loggerFactory.CreateLogger()); + + listener.HandleNotificationPayload("not-a-valid-payload"); + + var entry = logs.SingleByEventId(1212); + entry.Level.ShouldBe(LogLevel.Debug); + entry.Properties["SchemaName"].ShouldBe("sheddueller"); + entry.MessageTemplate.ShouldBe("Ignored PostgreSQL job-event notification with invalid payload for schema {SchemaName}."); + } + + private sealed class NoOpJobEventNotifier : IJobEventNotifier + { + public ValueTask NotifyAsync( + JobEvent jobEvent, + CancellationToken cancellationToken = default) + => ValueTask.CompletedTask; + } +} diff --git a/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj b/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj index 9286d4f..3f5dcb3 100644 --- a/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj +++ b/test/Sheddueller.Postgres.Tests/Sheddueller.Postgres.Tests.csproj @@ -8,6 +8,7 @@ + diff --git a/test/Sheddueller.ProviderContracts/InspectionContractTests.cs b/test/Sheddueller.ProviderContracts/InspectionContractTests.cs index 60ece44..b3b6038 100644 --- a/test/Sheddueller.ProviderContracts/InspectionContractTests.cs +++ b/test/Sheddueller.ProviderContracts/InspectionContractTests.cs @@ -1,5 +1,7 @@ namespace Sheddueller.ProviderContracts; +using System.Text.Json; + using Sheddueller.Inspection.ConcurrencyGroups; using Sheddueller.Inspection.Jobs; using Sheddueller.Inspection.Metrics; @@ -68,6 +70,85 @@ public async Task JobSummary_ClaimedAtUtc_IsVisibleForClaimedJobs() detail.ShouldNotBeNull().Summary.ClaimedAtUtc.ShouldBe(detail.ClaimedAtUtc); } + [Fact] + public async Task GetJob_InvocationMetadata_ReconstructsRuntimeBindingsAndJsonArguments() + { + await using var context = await this.CreateContextAsync(); + var jobId = Guid.NewGuid(); + + await context.Store.EnqueueAsync(CreateRequest( + jobId, + serviceType: typeof(InspectionInvocationService).AssemblyQualifiedName, + methodName: nameof(InspectionInvocationService.RunAsync), + methodParameterTypes: + [ + typeof(InspectionPayload).AssemblyQualifiedName!, + typeof(InspectionDependency).AssemblyQualifiedName!, + typeof(IJobContext).AssemblyQualifiedName!, + typeof(CancellationToken).AssemblyQualifiedName!, + ], + serializedArguments: new SerializedJobPayload( + SystemTextJsonJobPayloadSerializer.JsonContentType, + JsonSerializer.SerializeToUtf8Bytes(new[] { new { name = "alpha", count = 42 } })), + methodParameterBindings: + [ + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + new JobMethodParameterBinding(JobMethodParameterBindingKind.Service, typeof(InspectionDependency).AssemblyQualifiedName), + new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext), + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken), + ])); + + var detail = await context.Reader.GetJobAsync(jobId); + + var invocation = detail.ShouldNotBeNull().Invocation.ShouldNotBeNull(); + invocation.TargetKind.ShouldBe(JobInvocationTargetKind.Instance); + invocation.ServiceType.ShouldBe(typeof(InspectionInvocationService).AssemblyQualifiedName); + invocation.MethodName.ShouldBe(nameof(InspectionInvocationService.RunAsync)); + invocation.ReconstructedCall.ShouldBe(string.Join( + Environment.NewLine, + "InspectionInvocationService.RunAsync(", + " {\"name\":\"alpha\",\"count\":42},", + " Job.Resolve(),", + " Job.Context,", + " CancellationToken)")); + invocation.SerializedArgumentsContentType.ShouldBe(SystemTextJsonJobPayloadSerializer.JsonContentType); + invocation.SerializedArgumentsStatus.ShouldBe(JobSerializedArgumentsInspectionStatus.Displayable); + invocation.Parameters.Select(parameter => parameter.Binding.Kind).ShouldBe([ + JobMethodParameterBindingKind.Serialized, + JobMethodParameterBindingKind.Service, + JobMethodParameterBindingKind.JobContext, + JobMethodParameterBindingKind.CancellationToken, + ]); + var valueJson = invocation.Parameters[0].SerializedValueJson.ShouldNotBeNull(); + valueJson.ShouldContain("\"name\": \"alpha\""); + valueJson.ShouldContain("\"count\": 42"); + invocation.Parameters[1].Binding.ServiceType.ShouldBe(typeof(InspectionDependency).AssemblyQualifiedName); + invocation.Parameters.Skip(1).All(parameter => parameter.SerializedValueJson is null).ShouldBeTrue(); + } + + [Fact] + public async Task GetJob_InvocationMetadata_CustomPayloadReportsUnsupportedContentType() + { + await using var context = await this.CreateContextAsync(); + var jobId = Guid.NewGuid(); + + await context.Store.EnqueueAsync(CreateRequest( + jobId, + methodParameterTypes: [typeof(string).AssemblyQualifiedName!], + serializedArguments: new SerializedJobPayload("application/x-test", [1, 2, 3]), + methodParameterBindings: [new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized)])); + + var detail = await context.Reader.GetJobAsync(jobId); + + var invocation = detail.ShouldNotBeNull().Invocation.ShouldNotBeNull(); + invocation.SerializedArgumentsContentType.ShouldBe("application/x-test"); + invocation.ReconstructedCall.ShouldContain(""); + invocation.SerializedArgumentsByteCount.ShouldBe(3); + invocation.SerializedArgumentsStatus.ShouldBe(JobSerializedArgumentsInspectionStatus.UnsupportedContentType); + invocation.SerializedArgumentsStatusMessage.ShouldNotBeNull().ShouldContain("unsupported content type"); + invocation.Parameters.ShouldHaveSingleItem().SerializedValueJson.ShouldBeNull(); + } + [Fact] public async Task SearchJobs_HandlerSubstringSearch_MatchesAssemblyUnqualifiedHandler() { @@ -425,21 +506,27 @@ protected static EnqueueJobRequest CreateRequest( RetryBackoffKind? retryBackoffKind = null, TimeSpan? retryBaseDelay = null, string? serviceType = null, - string? methodName = null) + string? methodName = null, + IReadOnlyList? methodParameterTypes = null, + SerializedJobPayload? serializedArguments = null, + JobInvocationTargetKind invocationTargetKind = JobInvocationTargetKind.Instance, + IReadOnlyList? methodParameterBindings = null) => new( jobId, priority, serviceType ?? typeof(InspectionContractService).AssemblyQualifiedName!, methodName ?? nameof(InspectionContractService.RunAsync), - [typeof(CancellationToken).AssemblyQualifiedName!], - new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), + methodParameterTypes ?? [typeof(CancellationToken).AssemblyQualifiedName!], + serializedArguments ?? new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), groupKeys ?? [], DateTimeOffset.UtcNow, notBeforeUtc, maxAttempts, retryBackoffKind, retryBaseDelay, - Tags: tags); + Tags: tags, + InvocationTargetKind: invocationTargetKind, + MethodParameterBindings: methodParameterBindings); protected static UpsertRecurringScheduleRequest CreateSchedule( string scheduleKey, @@ -474,4 +561,20 @@ private sealed class InspectionContractService public Task RunAsync(CancellationToken cancellationToken) => Task.CompletedTask; } + + private sealed record InspectionPayload(string Name, int Count); + + private sealed class InspectionDependency + { + } + + private sealed class InspectionInvocationService + { + public Task RunAsync( + InspectionPayload payload, + InspectionDependency dependency, + IJobContext context, + CancellationToken cancellationToken) + => Task.CompletedTask; + } } diff --git a/test/Sheddueller.Tests/JobContextLoggingTests.cs b/test/Sheddueller.Tests/JobContextLoggingTests.cs new file mode 100644 index 0000000..918df4f --- /dev/null +++ b/test/Sheddueller.Tests/JobContextLoggingTests.cs @@ -0,0 +1,44 @@ +namespace Sheddueller.Tests; + +using Microsoft.Extensions.Logging; + +using Sheddueller.Runtime; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; + +using Shouldly; + +public sealed class JobContextLoggingTests +{ + [Fact] + public async Task Log_EventSinkFails_LogsDiagnosticAndDoesNotFailJob() + { + var jobId = Guid.NewGuid(); + using var logs = new TestLoggerProvider(); + using var loggerFactory = LoggerFactory.Create(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + var context = new JobContext( + jobId, + 1, + new FailingJobEventSink(), + loggerFactory.CreateLogger(), + CancellationToken.None); + + await context.LogAsync(JobLogLevel.Information, "hello"); + + var entry = logs.SingleByEventId(1040); + entry.Level.ShouldBe(LogLevel.Warning); + entry.Exception.ShouldBeOfType(); + entry.Properties["JobId"].ShouldBe(jobId); + entry.MessageTemplate.ShouldBe("Failed to append durable job event for job {JobId}."); + } + + private sealed class FailingJobEventSink : IJobEventSink + { + public ValueTask AppendAsync( + AppendJobEventRequest request, + CancellationToken cancellationToken = default) + => throw new InvalidOperationException("append failed"); + } +} diff --git a/test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs b/test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs new file mode 100644 index 0000000..1144edc --- /dev/null +++ b/test/Sheddueller.Tests/JobInvocationDisplayFormatterTests.cs @@ -0,0 +1,64 @@ +namespace Sheddueller.Tests; + +using Sheddueller.Inspection.Jobs; +using Sheddueller.Storage; + +using Shouldly; + +public sealed class JobInvocationDisplayFormatterTests +{ + [Fact] + public void Format_TwoArguments_KeepsCallOnSingleLine() + { + var call = JobInvocationDisplayFormatter.Format( + typeof(NestedService).AssemblyQualifiedName!, + "Run", + [ + new JobInvocationParameterInspection( + 0, + typeof(string).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + "\"alpha\""), + new JobInvocationParameterInspection( + 1, + typeof(IJobContext).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext)), + ]); + + call.ShouldBe("NestedService.Run(\"alpha\", Job.Context)"); + } + + [Fact] + public void Format_ThreeArguments_UsesMultilineLayout() + { + var call = JobInvocationDisplayFormatter.Format( + typeof(NestedService).AssemblyQualifiedName!, + "Run", + [ + new JobInvocationParameterInspection( + 0, + typeof(string).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.Serialized), + "\"alpha\""), + new JobInvocationParameterInspection( + 1, + typeof(IJobContext).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.JobContext)), + new JobInvocationParameterInspection( + 2, + typeof(CancellationToken).AssemblyQualifiedName!, + new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken)), + ]); + + call.ShouldBe(string.Join( + Environment.NewLine, + "NestedService.Run(", + " \"alpha\",", + " Job.Context,", + " CancellationToken)")); + } + + private sealed class NestedService + { + } +} diff --git a/test/Sheddueller.Tests/Logging/TestLoggerProvider.cs b/test/Sheddueller.Tests/Logging/TestLoggerProvider.cs new file mode 100644 index 0000000..2990d30 --- /dev/null +++ b/test/Sheddueller.Tests/Logging/TestLoggerProvider.cs @@ -0,0 +1,94 @@ +namespace Sheddueller.Tests.Logging; + +using System.Collections.Concurrent; + +using Microsoft.Extensions.Logging; + +internal sealed class TestLoggerProvider : ILoggerProvider +{ + private readonly ConcurrentQueue _entries = new(); + + public IReadOnlyList Entries + => [.. this._entries]; + + public ILogger CreateLogger(string categoryName) + => new TestLogger(categoryName, this._entries); + + public void Dispose() + { + } + + public TestLogEntry SingleByEventId(int eventId) + => this.Entries.Single(entry => entry.EventId.Id == eventId); + + public bool HasEventId(int eventId) + => this.Entries.Any(entry => entry.EventId.Id == eventId); + + private sealed class TestLogger( + string categoryName, + ConcurrentQueue entries) : ILogger + { + public IDisposable BeginScope(TState state) + where TState : notnull + => NullScope.Instance; + + public bool IsEnabled(LogLevel logLevel) + => true; + + public void Log( + LogLevel logLevel, + EventId eventId, + TState state, + Exception? exception, + Func formatter) + { + ArgumentNullException.ThrowIfNull(formatter); + + var properties = ReadProperties(state); + properties.TryGetValue("{OriginalFormat}", out var messageTemplate); + + entries.Enqueue(new TestLogEntry( + categoryName, + logLevel, + eventId, + messageTemplate as string, + formatter(state, exception), + exception, + properties)); + } + + private static Dictionary ReadProperties(TState state) + { + if (state is not IEnumerable> pairs) + { + return new Dictionary(StringComparer.Ordinal); + } + + var properties = new Dictionary(StringComparer.Ordinal); + foreach (var pair in pairs) + { + properties[pair.Key] = pair.Value; + } + + return properties; + } + } + + private sealed class NullScope : IDisposable + { + public static readonly NullScope Instance = new(); + + public void Dispose() + { + } + } +} + +internal sealed record TestLogEntry( + string CategoryName, + LogLevel Level, + EventId EventId, + string? MessageTemplate, + string RenderedMessage, + Exception? Exception, + IReadOnlyDictionary Properties); diff --git a/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs b/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs index 7b95ccb..4e46a2a 100644 --- a/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs +++ b/test/Sheddueller.Tests/RecurringScheduleManagerTests.cs @@ -1,5 +1,6 @@ namespace Sheddueller.Tests; +using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Sheddueller.Runtime; @@ -75,7 +76,8 @@ private static RecurringScheduleManager CreateManager( new SystemTextJsonJobPayloadSerializer(), Options.Create(options ?? new ShedduellerOptions()), TimeProvider.System, - wakeSignal); + wakeSignal, + NullLogger.Instance); private sealed class RecordingWakeSignal : IShedduellerWakeSignal { diff --git a/test/Sheddueller.Tests/RegistrationTests.cs b/test/Sheddueller.Tests/RegistrationTests.cs index a09ef9f..0f89f0a 100644 --- a/test/Sheddueller.Tests/RegistrationTests.cs +++ b/test/Sheddueller.Tests/RegistrationTests.cs @@ -2,6 +2,7 @@ namespace Sheddueller.Tests; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Sheddueller.Runtime; @@ -30,6 +31,7 @@ public void AddSheddueller_CustomProvider_RegistersCoreServices() provider.GetRequiredService().ShouldNotBeNull(); provider.GetRequiredService().ShouldBeSameAs(provider.GetRequiredService()); provider.GetRequiredService().ShouldBeSameAs(serializer); + provider.GetRequiredService().ShouldNotBeNull(); provider.GetServices().ShouldBeEmpty(); provider.GetServices().Count().ShouldBe(1); } diff --git a/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj b/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj index 8287aad..858ad30 100644 --- a/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj +++ b/test/Sheddueller.Worker.Tests/Sheddueller.Worker.Tests.csproj @@ -3,4 +3,8 @@ + + + + diff --git a/test/Sheddueller.Worker.Tests/WorkerLoggingTests.cs b/test/Sheddueller.Worker.Tests/WorkerLoggingTests.cs new file mode 100644 index 0000000..464bceb --- /dev/null +++ b/test/Sheddueller.Worker.Tests/WorkerLoggingTests.cs @@ -0,0 +1,206 @@ +namespace Sheddueller.Worker.Tests; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +using Sheddueller.Serialization; +using Sheddueller.Storage; +using Sheddueller.Tests.Logging; +using Sheddueller.Worker.Internal; + +using Shouldly; + +public sealed class WorkerLoggingTests +{ + [Fact] + public async Task JobExecution_Exception_LogsFailureAndMarksJobFailed() + { + using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var job = CreateClaimedJob(); + var store = new SingleClaimJobStore(job); + using var logs = new TestLoggerProvider(); + var services = new ServiceCollection(); + services.AddLogging(builder => builder + .SetMinimumLevel(LogLevel.Trace) + .AddProvider(logs)); + services.AddSingleton(store); + services.AddSingleton(serviceProvider => serviceProvider.GetRequiredService()); + services.AddTransient(); + services.AddShedduellerWorker(builder => builder.ConfigureOptions(options => + { + options.NodeId = "worker-a"; + options.IdlePollingInterval = TimeSpan.FromMilliseconds(10); + options.HeartbeatInterval = TimeSpan.FromSeconds(5); + options.LeaseDuration = TimeSpan.FromSeconds(30); + })); + await using var provider = services.BuildServiceProvider(); + var worker = provider.GetServices().OfType().Single(); + + await worker.StartAsync(cancellationTokenSource.Token); + var failed = await store.Failed.Task.WaitAsync(cancellationTokenSource.Token); + await worker.StopAsync(cancellationTokenSource.Token); + + failed.JobId.ShouldBe(job.JobId); + failed.Failure.ExceptionType.ShouldBe(typeof(InvalidOperationException).FullName); + + var entry = logs.SingleByEventId(1121); + entry.Level.ShouldBe(LogLevel.Error); + entry.Exception.ShouldBeOfType(); + entry.Properties["JobId"].ShouldBe(job.JobId); + entry.Properties["AttemptNumber"].ShouldBe(job.AttemptCount); + entry.Properties["NodeId"].ShouldBe("worker-a"); + entry.MessageTemplate.ShouldBe("Job {JobId} attempt {AttemptNumber} failed on node {NodeId}."); + } + + private static ClaimedJob CreateClaimedJob() + => new( + Guid.NewGuid(), + EnqueueSequence: 1, + Priority: 0, + ServiceType: typeof(FailingJob).AssemblyQualifiedName!, + MethodName: nameof(FailingJob.FailAsync), + MethodParameterTypes: [typeof(CancellationToken).AssemblyQualifiedName!], + SerializedArguments: new SerializedJobPayload(SystemTextJsonJobPayloadSerializer.JsonContentType, "[]"u8.ToArray()), + ConcurrencyGroupKeys: [], + AttemptCount: 1, + MaxAttempts: 1, + LeaseToken: Guid.NewGuid(), + LeaseExpiresAtUtc: DateTimeOffset.UtcNow.AddSeconds(30), + RetryBackoffKind: null, + RetryBaseDelay: null, + RetryMaxDelay: null, + SourceScheduleKey: null, + ScheduledFireAtUtc: null, + MethodParameterBindings: [new JobMethodParameterBinding(JobMethodParameterBindingKind.CancellationToken)]); + + private sealed class FailingJob + { + public Task FailAsync(CancellationToken cancellationToken) + => throw new InvalidOperationException("job failed"); + } + + private sealed class SingleClaimJobStore(ClaimedJob job) : IJobStore + { + private int _claimed; + + public TaskCompletionSource Failed { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public ValueTask EnqueueAsync( + EnqueueJobRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> EnqueueManyAsync( + IReadOnlyList requests, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TryClaimNextAsync( + ClaimJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult( + Interlocked.Exchange(ref this._claimed, 1) == 0 + ? new ClaimJobResult.Claimed(job) + : new ClaimJobResult.NoJobAvailable()); + + public ValueTask MarkCompletedAsync( + CompleteJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask MarkFailedAsync( + FailJobRequest request, + CancellationToken cancellationToken = default) + { + this.Failed.TrySetResult(request); + return ValueTask.FromResult(true); + } + + public ValueTask RenewLeaseAsync( + RenewLeaseRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask ReleaseJobAsync( + ReleaseJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecoverExpiredLeasesAsync( + RecoverExpiredLeasesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + + public ValueTask CancelAsync( + CancelJobRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(JobCancellationResult.NotFound); + + public ValueTask GetCancellationRequestedAtAsync( + JobCancellationStatusRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(null); + + public ValueTask MarkCancellationObservedAsync( + ObserveJobCancellationRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(true); + + public ValueTask RecordWorkerNodeHeartbeatAsync( + WorkerNodeHeartbeatRequest request, + CancellationToken cancellationToken = default) + => ValueTask.CompletedTask; + + public ValueTask SetConcurrencyLimitAsync( + SetConcurrencyLimitRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetConfiguredConcurrencyLimitAsync( + string groupKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask CreateOrUpdateRecurringScheduleAsync( + UpsertRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask TriggerRecurringScheduleAsync( + TriggerRecurringScheduleRequest request, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask DeleteRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask PauseRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset pausedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask ResumeRecurringScheduleAsync( + string scheduleKey, + DateTimeOffset resumedAtUtc, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask GetRecurringScheduleAsync( + string scheduleKey, + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask> ListRecurringSchedulesAsync( + CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public ValueTask MaterializeDueRecurringSchedulesAsync( + MaterializeDueRecurringSchedulesRequest request, + CancellationToken cancellationToken = default) + => ValueTask.FromResult(0); + } +}