Skip to content

Commit eeea43e

Browse files
committed
fix: shutdown
1 parent 722fc79 commit eeea43e

14 files changed

Lines changed: 243 additions & 42 deletions

File tree

src/Directory.Build.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
<Project>
22

33
<PropertyGroup>
4-
<Version>1.0.0-alpha.24</Version>
5-
<PackageVersion>1.0.0-alpha.24</PackageVersion>
4+
<Version>1.0.0-alpha.25</Version>
5+
<PackageVersion>1.0.0-alpha.25</PackageVersion>
66
<Authors>Zapto</Authors>
77
<RepositoryUrl>https://github.com/zapto-dev/Mediator</RepositoryUrl>
88
<Copyright>Copyright © 2025 Zapto</Copyright>

src/Mediator.DependencyInjection/DefaultBackgroundPublisher.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,49 +18,62 @@ public DefaultBackgroundPublisher(IServiceScopeFactory scopeFactory)
1818
/// <inheritdoc />
1919
public void Publish(object notification)
2020
{
21-
_ = Task.Run(async () =>
21+
ObserveTask(Task.Run(async () =>
2222
{
2323
using var scope = _scopeFactory.CreateScope();
2424
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();
2525

2626
await mediator.Publish(notification, CancellationToken.None);
27-
}, CancellationToken.None);
27+
}, CancellationToken.None));
2828
}
2929

3030
/// <inheritdoc />
3131
public void Publish(MediatorNamespace ns, object notification)
3232
{
33-
_ = Task.Run(async () =>
33+
ObserveTask(Task.Run(async () =>
3434
{
3535
using var scope = _scopeFactory.CreateScope();
3636
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();
3737

3838
await mediator.Publish(ns, notification, CancellationToken.None);
39-
}, CancellationToken.None);
39+
}, CancellationToken.None));
4040
}
4141

4242
/// <inheritdoc />
4343
public void Publish<TNotification>(TNotification notification) where TNotification : INotification
4444
{
45-
_ = Task.Run(async () =>
45+
ObserveTask(Task.Run(async () =>
4646
{
4747
using var scope = _scopeFactory.CreateScope();
4848
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();
4949

5050
await mediator.Publish(notification, CancellationToken.None);
51-
}, CancellationToken.None);
51+
}, CancellationToken.None));
5252
}
5353

5454
/// <inheritdoc />
5555
public void Publish<TNotification>(MediatorNamespace ns, TNotification notification)
5656
where TNotification : INotification
5757
{
58-
_ = Task.Run(async () =>
58+
ObserveTask(Task.Run(async () =>
5959
{
6060
using var scope = _scopeFactory.CreateScope();
6161
var mediator = scope.ServiceProvider.GetRequiredService<IPublisher>();
6262

6363
await mediator.Publish(ns, notification, CancellationToken.None);
64-
}, CancellationToken.None);
64+
}, CancellationToken.None));
65+
}
66+
67+
/// <summary>
68+
/// Observes faulted fire-and-forget tasks so they do not trigger
69+
/// <see cref="TaskScheduler.UnobservedTaskException"/> and crash the process.
70+
/// </summary>
71+
private static void ObserveTask(Task task)
72+
{
73+
task.ContinueWith(
74+
static t => GC.KeepAlive(t.Exception),
75+
CancellationToken.None,
76+
TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.ExecuteSynchronously,
77+
TaskScheduler.Default);
6578
}
6679
}

src/Mediator.DependencyInjection/Generic/Handlers/GenericNotificationHandler.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ internal interface IHandlerRegistration
2424
{
2525
INotificationCache Owner { get; }
2626

27+
/// <summary>
28+
/// Set to <c>true</c> under <see cref="INotificationCache.Lock"/> when the
29+
/// handler is disposed. Checked (without lock) as a best-effort guard in
30+
/// <see cref="InvokeAsync"/> to skip already-disposed handlers.
31+
/// </summary>
32+
bool IsDisposed { get; set; }
33+
2734
ValueTask InvokeAsync(IServiceProvider provider, object notification, CancellationToken cancellationToken);
2835
}
2936

@@ -53,7 +60,8 @@ public GenericNotificationCache(IEnumerable<GenericNotificationRegistration> reg
5360
}
5461
}
5562

56-
public List<Type>? HandlerTypes { get; set; }
63+
private List<Type>? _handlerTypes;
64+
public List<Type>? HandlerTypes { get => Volatile.Read(ref _handlerTypes); set => Volatile.Write(ref _handlerTypes, value); }
5765

5866
public List<GenericNotificationRegistration> MatchingRegistrations { get; }
5967

src/Mediator.DependencyInjection/Generic/Handlers/GenericRequestHandler.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public GenericRequestCache(IEnumerable<GenericRequestRegistration> registrations
4343
}
4444
}
4545

46-
public Type? RequestHandlerType { get; set; }
46+
private Type? _requestHandlerType;
47+
public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); }
4748

4849
public List<GenericRequestRegistration> MatchingRegistrations { get; }
4950
}
@@ -67,7 +68,8 @@ public GenericRequestCache(IEnumerable<GenericRequestRegistration> registrations
6768
}
6869
}
6970

70-
public Type? RequestHandlerType { get; set; }
71+
private Type? _requestHandlerType;
72+
public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); }
7173

7274
public List<GenericRequestRegistration> MatchingRegistrations { get; }
7375
}

src/Mediator.DependencyInjection/Generic/Handlers/GenericStreamRequestHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public GenericStreamRequestCache(IEnumerable<GenericStreamRequestRegistration> r
4141
}
4242
}
4343

44-
public Type? RequestHandlerType { get; set; }
44+
private Type? _requestHandlerType;
45+
public Type? RequestHandlerType { get => Volatile.Read(ref _requestHandlerType); set => Volatile.Write(ref _requestHandlerType, value); }
4546

4647
public List<GenericStreamRequestRegistration> MatchingRegistrations { get; }
4748
}

src/Mediator.DependencyInjection/Generic/NotificationAttributeHandler.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@ static NotificationAttributeHandler()
1919

2020
if (attribute == null) continue;
2121

22-
var notificationType = method.GetParameters()[0].ParameterType;
22+
var parameters = method.GetParameters();
23+
if (parameters.Length == 0)
24+
{
25+
throw new InvalidOperationException($"Method '{method.Name}' on '{typeof(T).FullName}' is decorated with [NotificationHandler] but has no parameters. The first parameter must be the notification type.");
26+
}
27+
28+
var notificationType = parameters[0].ParameterType;
2329

2430
if (!typeof(INotification).IsAssignableFrom(notificationType))
2531
{
@@ -45,8 +51,14 @@ public static IDisposable RegisterHandlers(IServiceProvider serviceProvider, obj
4551
var registration = new HandlerRegistration(cache, invoker, handler, middleware);
4652

4753
cache.Lock.Wait();
48-
cache.Registrations.Add(registration);
49-
cache.Lock.Release();
54+
try
55+
{
56+
cache.Registrations.Add(registration);
57+
}
58+
finally
59+
{
60+
cache.Lock.Release();
61+
}
5062

5163
registrations.Add(registration);
5264
}
@@ -70,8 +82,15 @@ public void Dispose()
7082
var owner = registration.Owner;
7183

7284
owner.Lock.Wait();
73-
owner.Registrations.Remove(registration);
74-
owner.Lock.Release();
85+
try
86+
{
87+
registration.IsDisposed = true;
88+
owner.Registrations.Remove(registration);
89+
}
90+
finally
91+
{
92+
owner.Lock.Release();
93+
}
7594
}
7695
}
7796
}
@@ -97,7 +116,14 @@ public HandlerRegistration(
97116

98117
public Func<T, IServiceProvider, object, CancellationToken, ValueTask> Invoke { get; }
99118

100-
public bool IsDisposed { get; set; }
119+
private bool _isDisposed;
120+
121+
public bool IsDisposed
122+
{
123+
get => Volatile.Read(ref _isDisposed);
124+
// Written under Owner.Lock, but Volatile.Write ensures visibility for lock-free reads in InvokeAsync
125+
set => Volatile.Write(ref _isDisposed, value);
126+
}
101127

102128
public ValueTask InvokeAsync(IServiceProvider provider, object notification, CancellationToken cancellationToken)
103129
{

src/Mediator.DependencyInjection/ServiceProviderMediator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public IDisposable RegisterNotificationHandler(object handler, Func<Func<Task>,
7979
{
8080
invokeAsync = cb =>
8181
{
82-
_ = Task.Run(() => invoker(cb));
82+
_ = invoker(cb);
8383
return Task.CompletedTask;
8484
};
8585
}

src/Mediator.Hosting/MediatorExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using Microsoft.Extensions.DependencyInjection;
3+
using Microsoft.Extensions.DependencyInjection.Extensions;
34
using Zapto.Mediator.Options;
45
using Zapto.Mediator.Services;
56

@@ -16,7 +17,7 @@ public static IMediatorBuilder AddHostingBackgroundScheduler(this IMediatorBuild
1617

1718
mediatorBuilder.Services.AddSingleton<BackgroundQueueService>();
1819
mediatorBuilder.Services.AddHostedService<BackgroundQueueHostedService>();
19-
mediatorBuilder.Services.AddSingleton<IBackgroundPublisher, BackgroundPublisher>();
20+
mediatorBuilder.Services.Replace(ServiceDescriptor.Singleton<IBackgroundPublisher, BackgroundPublisher>());
2021

2122
if (configure is not null)
2223
{

src/Mediator.Hosting/Services/BackgroundQueueService.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
44
using System.Linq;
5-
using System.Security.Cryptography;
65
using System.Threading;
76
using System.Threading.Tasks;
8-
using MediatR;
97
using Microsoft.Extensions.Hosting;
108
using Microsoft.Extensions.Logging;
119
using Microsoft.Extensions.Options;
@@ -36,18 +34,22 @@ public void QueueBackgroundWorkItem(Func<Task> workItem, object notification)
3634
throw new OperationCanceledException("Cannot schedule work item since the application is stopping");
3735
}
3836

39-
_workItems.Enqueue(workItem);
40-
4137
lock (_workers)
4238
{
39+
_workItems.Enqueue(workItem);
40+
4341
if (_workers.Count < _options.Value.MaxDegreeOfParallelism)
4442
{
4543
var worker = new Worker
4644
{
4745
Notification = notification
4846
};
4947

50-
worker.Task = Task.Factory.StartNew(() => ProcessBackgroundWorkItem(worker), TaskCreationOptions.LongRunning);
48+
worker.Task = Task.Factory.StartNew(
49+
() => ProcessBackgroundWorkItem(worker),
50+
CancellationToken.None,
51+
TaskCreationOptions.None,
52+
TaskScheduler.Default).Unwrap();
5153
_workers.Add(worker);
5254
}
5355
}
@@ -93,9 +95,13 @@ public async Task WaitForBackgroundTasksAsync(CancellationToken cancellationToke
9395
resultingTask = await Task.WhenAny(task, tcs.Task);
9496
#endif
9597
}
98+
catch (OperationCanceledException)
99+
{
100+
throw;
101+
}
96102
catch
97103
{
98-
// ignore
104+
// worker exceptions are already logged at the source; ignore here
99105
}
100106

101107
#if !NET
@@ -127,6 +133,7 @@ private async Task ProcessBackgroundWorkItem(Worker worker)
127133
// Process next work item
128134
if (!_workItems.TryDequeue(out var workItem))
129135
{
136+
await Task.Yield();
130137
continue;
131138
}
132139

src/Mediator.SourceGenerator/SenderGenerator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ namespace Zapto.Mediator.Generator;
1414
[Generator]
1515
public class SenderGenerator : IIncrementalGenerator
1616
{
17-
private bool _generateAssemblyInfo;
17+
private readonly bool _generateAssemblyInfo;
1818

1919
private static readonly string[] Interfaces =
2020
[

0 commit comments

Comments
 (0)