From 761d21b795e88981e9e89c08d62a13ec99ef8996 Mon Sep 17 00:00:00 2001 From: joshkempner Date: Tue, 24 Mar 2026 12:20:50 -0400 Subject: [PATCH 1/2] Adds async versions of ReadModelBase.Start() Adds a `StartAsync` method for each overload of `Start` in `ReadModelBase`. These methods perform event playback on a task pool thread. The class exposes an awaitable `IsLive` property that completes when all async playback Tasks have completed. --- .../when_using_read_model_base_with_reader.cs | 120 +++-- .../StreamStore/ReadModelBase.cs | 436 +++++++++++------- 2 files changed, 360 insertions(+), 196 deletions(-) diff --git a/src/ReactiveDomain.Foundation.Tests/when_using_read_model_base_with_reader.cs b/src/ReactiveDomain.Foundation.Tests/when_using_read_model_base_with_reader.cs index fcf453c5..62f2c314 100644 --- a/src/ReactiveDomain.Foundation.Tests/when_using_read_model_base_with_reader.cs +++ b/src/ReactiveDomain.Foundation.Tests/when_using_read_model_base_with_reader.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Testing; @@ -10,12 +11,11 @@ namespace ReactiveDomain.Foundation.Tests; public class when_using_read_model_base_with_reader : ReadModelBase, IHandle, - IClassFixture { + IClassFixture +{ private static IStreamStoreConnection _conn; - private static readonly IEventSerializer Serializer = - new JsonMessageSerializer(); - private static readonly IStreamNameBuilder Namer = - new PrefixedCamelCaseStreamNameBuilder(nameof(when_using_read_model_base)); + private static readonly JsonMessageSerializer Serializer = new(); + private static readonly PrefixedCamelCaseStreamNameBuilder Namer = new(nameof(when_using_read_model_base)); private readonly string _stream1; private readonly string _stream2; @@ -23,7 +23,8 @@ public class when_using_read_model_base_with_reader : public when_using_read_model_base_with_reader(StreamStoreConnectionFixture fixture) : base(nameof(when_using_read_model_base), - new ConfiguredConnection(fixture.Connection, Namer, Serializer)) { + new ConfiguredConnection(fixture.Connection, Namer, Serializer)) + { _conn = fixture.Connection; _conn.Connect(); @@ -40,18 +41,22 @@ public when_using_read_model_base_with_reader(StreamStoreConnectionFixture fixtu _conn.TryConfirmStream(Namer.GenerateForCategory(typeof(TestAggregate)), 20); } - private void AppendEvents( + private static void AppendEvents( int numEventsToBeSent, IStreamStoreConnection conn, string streamName, - int value) { - for (int evtNumber = 0; evtNumber < numEventsToBeSent; evtNumber++) { + int value) + { + for (int evtNumber = 0; evtNumber < numEventsToBeSent; evtNumber++) + { var evt = new ReadModelTestEvent(evtNumber, value); conn.AppendToStream(streamName, ExpectedVersion.Any, null, Serializer.Serialize(evt)); } } + [Fact] - public void can_start_streams_by_aggregate() { + public void can_start_streams_by_aggregate() + { var aggId = Guid.NewGuid(); var s1 = Namer.GenerateForAggregate(typeof(TestAggregate), aggId); AppendEvents(1, _conn, s1, 7); @@ -59,20 +64,34 @@ public void can_start_streams_by_aggregate() { AssertEx.IsOrBecomesTrue(() => Count == 1, 1000, msg: $"Expected 1 got {Count}"); AssertEx.IsOrBecomesTrue(() => Sum == 7); } + [Fact] - public void can_start_streams_by_aggregate_category() { + public async Task can_start_streams_async_by_aggregate() { + var aggId = Guid.NewGuid(); + var s1 = Namer.GenerateForAggregate(typeof(TestAggregate), aggId); + AppendEvents(1, _conn, s1, 7); + StartAsync(aggId); + await IsLive; + Assert.Equal(1, Count); + Assert.Equal(7, Sum); + } + [Fact] + public async Task can_start_streams_async_by_aggregate_category() { var s1 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid()); AppendEvents(1, _conn, s1, 7); var s2 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid()); AppendEvents(1, _conn, s2, 5); - Start(null, true); + StartAsync(); - AssertEx.IsOrBecomesTrue(() => Count == 2, 1000, msg: $"Expected 2 got {Count}"); - AssertEx.IsOrBecomesTrue(() => Sum == 12); + await IsLive; + Assert.Equal(2, Count); + Assert.Equal(12, Sum); } + [Fact] - public void can_read_one_stream() { + public void can_read_one_stream() + { Start(_stream1); AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}"); AssertEx.IsOrBecomesTrue(() => Sum == 20); @@ -80,8 +99,10 @@ public void can_read_one_stream() { Assert.Equal(_stream1, GetCheckpoint()[0].Item1); Assert.Equal(9, GetCheckpoint()[0].Item2); } + [Fact] - public void can_read_two_streams() { + public void can_read_two_streams() + { Start(_stream1); Start(_stream2); AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}"); @@ -92,24 +113,36 @@ public void can_read_two_streams() { Assert.Equal(_stream2, GetCheckpoint()[1].Item1); Assert.Equal(9, GetCheckpoint()[1].Item2); } + [Fact] - public void can_wait_for_one_stream_to_go_live() { - Start(_stream1, null, true); - AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}"); - AssertEx.IsOrBecomesTrue(() => Sum == 20, 100); + public async Task is_live_task_returns_immediately_when_no_streams_are_played_back_async() + { + await IsLive; + Assert.Equal(0, Count); } + [Fact] - public void can_wait_for_two_streams_to_go_live() { - Start(_stream1, null, true); - AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}"); - AssertEx.IsOrBecomesTrue(() => Sum == 20, 100); + public async Task can_await_one_stream_going_live() + { + StartAsync(_stream1); + await IsLive; + Assert.Equal(10, Count); + Assert.Equal(20, Sum); + } - Start(_stream2, null, true); - AssertEx.IsOrBecomesTrue(() => Count == 20, 100, msg: $"Expected 20 got {Count}"); - AssertEx.IsOrBecomesTrue(() => Sum == 50, 100); + [Fact] + public async Task can_await_two_streams_going_live() + { + StartAsync(_stream1); + StartAsync(_stream2); + await IsLive; + Assert.Equal(20, Count); + Assert.Equal(50, Sum); } + [Fact] - public void can_listen_to_one_stream() { + public void can_listen_to_one_stream() + { Start(_stream1); AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}"); AssertEx.IsOrBecomesTrue(() => Sum == 20); @@ -119,11 +152,14 @@ public void can_listen_to_one_stream() { AssertEx.IsOrBecomesTrue(() => Sum == 70); //confirm checkpoints Assert.Equal(_stream1, GetCheckpoint()[0].Item1); - Assert.Equal(19, GetCheckpoint()[0].Item2); Assert.Equal(_stream1, GetCheckpoint()[0].Item1); + Assert.Equal(19, GetCheckpoint()[0].Item2); + Assert.Equal(_stream1, GetCheckpoint()[0].Item1); Assert.Equal(19, GetCheckpoint()[0].Item2); } + [Fact] - public void can_listen_to_two_streams() { + public void can_listen_to_two_streams() + { Start(_stream1); Start(_stream2); AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}"); @@ -139,10 +175,12 @@ public void can_listen_to_two_streams() { Assert.Equal(_stream2, GetCheckpoint()[1].Item1); Assert.Equal(19, GetCheckpoint()[1].Item2); } + [Fact] - public void can_use_checkpoint_on_one_stream() { + public void can_use_checkpoint_on_one_stream() + { //restore state - var checkPoint = 8L;//Zero based, ignore the first 9 events + var checkPoint = 8L; //Zero based, ignore the first 9 events Count = 9; Sum = 18; //start at the checkpoint @@ -158,11 +196,13 @@ public void can_use_checkpoint_on_one_stream() { Assert.Equal(_stream1, GetCheckpoint()[0].Item1); Assert.Equal(19, GetCheckpoint()[0].Item2); } + [Fact] - public void can_use_checkpoint_on_two_streams() { + public void can_use_checkpoint_on_two_streams() + { //restore state - var checkPoint1 = 8L;//Zero based, ignore the first 9 events - var checkPoint2 = 5L;//Zero based, ignore the first 6 events + var checkPoint1 = 8L; //Zero based, ignore the first 9 events + var checkPoint2 = 5L; //Zero based, ignore the first 6 events Count = (9) + (6); Sum = (9 * 2) + (6 * 3); Start(_stream1, checkPoint1); @@ -181,8 +221,10 @@ public void can_use_checkpoint_on_two_streams() { Assert.Equal(_stream2, GetCheckpoint()[1].Item1); Assert.Equal(19, GetCheckpoint()[1].Item2); } + [Fact] - public void can_listen_to_the_same_stream_twice() { + public void can_listen_to_the_same_stream_twice() + { Assert.Equal(0, Count); //weird but true //n.b. Don't do this on purpose @@ -199,10 +241,14 @@ public void can_listen_to_the_same_stream_twice() { public long Sum { get; private set; } public long Count { get; private set; } - void IHandle.Handle(ReadModelTestEvent @event) { + + void IHandle.Handle(ReadModelTestEvent @event) + { Sum += @event.Value; Count++; } + public record ReadModelTestEvent(int Number, int Value) : Event; + public class ReadModelTestCategoryAggregate : EventDrivenStateMachine; } \ No newline at end of file diff --git a/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs b/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs index e2650b89..8308ddcc 100644 --- a/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs +++ b/src/ReactiveDomain.Foundation/StreamStore/ReadModelBase.cs @@ -1,202 +1,320 @@ using System; using System.Collections.Generic; -using System.Diagnostics.SymbolStore; using System.Linq; using System.Threading; +using System.Threading.Tasks; using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using ReactiveDomain.Util; // ReSharper disable once CheckNamespace -namespace ReactiveDomain.Foundation +namespace ReactiveDomain.Foundation; + +public abstract class ReadModelBase : + IHandle, + IHandle, + IPublisher, + IDisposable { - public abstract class ReadModelBase : - IHandle, - IHandle, - IPublisher, - IDisposable - { - private readonly Func _getListener; - private readonly List _listeners; - private readonly Func _getReader; - private readonly InMemoryBus _bus; - private readonly QueuedHandler _queue; - public int MessageCount => _queue.MessageCount; - public bool Idle => _queue.Idle; - - /// - /// ReaderLock locks the event handler and can be used when reading the model - /// to ensure model state is unchanged during read. - /// The lock should *not* be used in Handle methods as they are inside the lock already by default. - /// - protected readonly object ReaderLock = new object(); - - /// - /// The version is equal to the number of messages passed to the read model. - /// The version is incremented after all handlers have been processed. - /// The number of handlers (including none) will not impact the version. - /// This can be used to ensure read model state for tests. This is *not* - /// the same as the version of any particular stream being read. This can - /// include , - /// which may result in the Version being 1 greater than otherwise expected. - /// - public int Version { get; private set; } - - /// - /// Creates a read model using the provided stream store connection. Reads existing events using a - /// reader, then transitions to a listener for live events. - /// - /// The name of the read model. Also used as the names of the listener and reader. - /// A connection to a stream store. - protected ReadModelBase(string name, IConfiguredConnection connection) + private readonly Func _getListener; + private readonly List _listeners; + private readonly Func _getReader; + private readonly List _readTasks = []; + private readonly InMemoryBus _bus; + private readonly QueuedHandler _queue; + public int MessageCount => _queue.MessageCount; + public bool Idle => _queue.Idle; + + /// + /// ReaderLock locks the event handler and can be used when reading the model + /// to ensure model state is unchanged during read. + /// The lock should *not* be used in Handle methods as they are inside the lock already by default. + /// + protected readonly object ReaderLock = new(); + + /// + /// The version is equal to the number of messages passed to the read model. + /// The version is incremented after all handlers have been processed. + /// The number of handlers (including none) will not impact the version. + /// This can be used to ensure read model state for tests. This is *not* + /// the same as the version of any particular stream being read. This can + /// include , + /// which may result in the Version being 1 greater than otherwise expected. + /// + public int Version { get; private set; } + + /// + /// Gets a task that completes when all streams started using a StartAsync overload are live. + /// + /// The returned task represents the aggregate completion of all underlying read tasks. Awaiting + /// this property allows callers to determine when all read operations are complete. + public Task IsLive => Task.WhenAll(_readTasks); + + /// + /// Creates a read model using the provided stream store connection. Reads existing events using a + /// reader, then transitions to a listener for live events. + /// + /// The name of the read model. Also used as the names of the listener and reader. + /// A connection to a stream store. + protected ReadModelBase(string name, IConfiguredConnection connection) + { + Ensure.NotNull(connection, nameof(connection)); + _getReader = () => connection.GetReader(name, Handle); + _getListener = () => connection.GetListener(name); + _listeners = []; + _bus = new InMemoryBus($"{nameof(ReadModelBase)}:{name} bus", false); + _queue = new QueuedHandler(new AdHocHandler(DequeueMessage), + $"{nameof(ReadModelBase)}:{name} queue"); + _queue.Start(); + } + + /// + /// Every message handled by the read model will pass through here. + /// + private void DequeueMessage(IMessage message) + { + lock (ReaderLock) { - Ensure.NotNull(connection, nameof(connection)); - _getReader = () => connection.GetReader(name, Handle); - _getListener = () => connection.GetListener(name); - _listeners = new List(); - _bus = new InMemoryBus($"{nameof(ReadModelBase)}:{name} bus", false); - _queue = new QueuedHandler(new AdHocHandler(DequeueMessage), $"{nameof(ReadModelBase)}:{name} queue"); - _queue.Start(); + _bus.Handle(message); + Version++; } + } - /// - /// Every message handled by the read model will pass through here. - /// - private void DequeueMessage(IMessage message) + private IListener AddNewListener() + { + var l = _getListener(); + lock (_listeners) { - lock (ReaderLock) - { - _bus.Handle(message); - Version++; - } + _listeners.Add(l); } - private IListener AddNewListener() + l.EventStream.SubscribeToAll(_queue); + return l; + } + + /// + /// Get the positions of all listeners. + /// + /// A list of Tuples of listener names and checkpoints. + public List> GetCheckpoint() + { + lock (_listeners) { - var l = _getListener(); - lock (_listeners) - { - _listeners.Add(l); - } - l.EventStream.SubscribeToAll(_queue); - return l; + return _listeners.Select(l => new Tuple(l.StreamName, l.Position)).ToList(); } + } + + /// + /// The stream of events that handlers should subscribe to. + /// + public ISubscriber EventStream => _bus; - /// - /// Get the positions of all listeners. - /// - /// A list of Tuples of listener names and checkpoints. - public List> GetCheckpoint() + /// + /// Start playback of a named stream. + /// + /// The name of the stream to play back. + /// The event to start with. + /// If true, blocks returning from this method until the listener has caught up. + ///
+ /// This parameter is deprecated and will be removed in a future release. Use and + /// await instead. + /// ensure the stream exists on start + /// Cancellation token to cancel waiting if blockUntilLive is true. + public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, + bool validateStream = false, CancellationToken cancelWaitToken = default) + { + if (_getReader != null) { - lock (_listeners) - { - return _listeners.Select(l => new Tuple(l.StreamName, l.Position)).ToList(); - } + using var reader = _getReader(); + reader.Read(stream, () => Idle, checkpoint); + checkpoint = reader.Position ?? checkpoint; } - /// - /// The stream of events that handlers should subscribe to. - /// - public ISubscriber EventStream => _bus; - - /// - /// Start playback of a named stream. - /// - /// The name of the stream to play back. - /// The event to start with. - /// If true, blocks returning from this method until the listener has caught up. - /// Cancellation token to cancel waiting if blockUntilLive is true. - public void Start(string stream, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) + AddNewListener().Start(stream, checkpoint, blockUntilLive, validateStream, cancelWaitToken); + } + + /// + /// Start playback of a named stream on a task pool thread. + /// Await to know when all streams are caught up. + /// + /// The name of the stream to play back. + /// The event to start with. + /// ensure the stream exists on start + /// Cancellation token to cancel waiting if blockUntilLive is true. + public void StartAsync(string stream, long? checkpoint = null, bool validateStream = false, + CancellationToken cancelWaitToken = default) + { + _readTasks.Add(Task.Run(() => { if (_getReader != null) { - using (var reader = _getReader()) - { - reader.Read(stream, () => Idle, checkpoint); - checkpoint = reader.Position ?? checkpoint; - } + using var reader = _getReader(); + reader.Read(stream, () => Idle, checkpoint); + checkpoint = reader.Position ?? checkpoint; } - AddNewListener().Start(stream, checkpoint, blockUntilLive, validateStream, cancelWaitToken); + + AddNewListener().Start(stream, checkpoint, false, validateStream, cancelWaitToken); + }, cancelWaitToken)); + } + + /// + /// Start playback of a specific stream of type . + /// + /// The type of stream to play back. + /// The ID of the stream to play back. + /// The event to start with. + /// If true, blocks returning from this method until the listener has caught up. + ///
+ /// This parameter is deprecated and will be removed in a future release. Use + /// and + /// await instead. + /// ensure the stream exists on start + /// Cancellation token to cancel waiting if blockUntilLive is true. + public void Start(Guid id, long? checkpoint = null, bool blockUntilLive = false, + bool validateStream = false, CancellationToken cancelWaitToken = default) + where TAggregate : class, IEventSource + { + if (_getReader != null) + { + using var reader = _getReader(); + reader.Read(id, () => Idle, checkpoint); + checkpoint = reader.Position; } - /// - /// Start playback of a specific stream of type TAggregate. - /// - /// The type of stream to play back. - /// The ID of the stream to play back. - /// The event to start with. - /// If true, blocks returning from this method until the listener has caught up. - /// Cancellation token to cancel waiting if blockUntilLive is true. - public void Start(Guid id, long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource + AddNewListener().Start(id, checkpoint, blockUntilLive, validateStream, cancelWaitToken); + } + + /// + /// Start playback of a specific stream of type on a task pool thread. + /// Await to know when all streams are caught up. + /// + /// The type of stream to play back. + /// The ID of the stream to play back. + /// The event to start with. + /// ensure the stream exists on start + /// Cancellation token to cancel waiting if blockUntilLive is true. + public void StartAsync(Guid id, long? checkpoint = null, bool validateStream = false, + CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource + { + _readTasks.Add(Task.Run(() => { if (_getReader != null) { - using (var reader = _getReader()) - { - reader.Read(id, () => Idle, checkpoint); - checkpoint = reader.Position; - } + using var reader = _getReader(); + reader.Read(id, () => Idle, checkpoint); + checkpoint = reader.Position; } - AddNewListener().Start(id, checkpoint, blockUntilLive, validateStream, cancelWaitToken); + + AddNewListener().Start(id, checkpoint, false, validateStream, cancelWaitToken); + }, cancelWaitToken)); + } + + /// + /// Start a category listener for type . + /// + /// The type of stream to play back. + /// The event to start with. + /// If true, blocks returning from this method until the listener has caught up. + ///
+ /// This parameter is deprecated and will be removed in a future release. Use + /// and await + /// instead. + /// ensure the stream exists on start + /// Cancellation token to cancel waiting if blockUntilLive is true. + public void Start(long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, + CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource + { + if (_getReader != null) + { + using var reader = _getReader(); + reader.Read(() => Idle, checkpoint); + checkpoint = reader.Position; } - /// - /// Start a category listener for type TAggregate. - /// - /// The type of stream to play back. - /// The event to start with. - /// If true, blocks returning from this method until the listener has caught up. - /// Cancellation token to cancel waiting if blockUntilLive is true. - public void Start(long? checkpoint = null, bool blockUntilLive = false, bool validateStream = false, CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource + AddNewListener().Start(checkpoint, blockUntilLive, validateStream, cancelWaitToken); + } + + /// + /// Start a category listener for type . + /// Events are played back on a task pool thread. + /// Await to know when all streams are caught up. + /// + /// The type of stream to play back. + /// The event to start with. + /// ensure the stream exists on start + /// Cancellation token to cancel waiting if blockUntilLive is true. + public void StartAsync(long? checkpoint = null, bool validateStream = false, + CancellationToken cancelWaitToken = default) where TAggregate : class, IEventSource + { + _readTasks.Add(Task.Run(() => { if (_getReader != null) { - using (var reader = _getReader()) - { - reader.Read(() => Idle, checkpoint); - checkpoint = reader.Position; - } + using var reader = _getReader(); + reader.Read(() => Idle, checkpoint); + checkpoint = reader.Position; } - AddNewListener().Start(checkpoint, blockUntilLive, validateStream, cancelWaitToken); - } - /// - /// Dispose of resources. - /// - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - private bool _disposed; - protected virtual void Dispose(bool disposing) + AddNewListener().Start(checkpoint, false, validateStream, cancelWaitToken); + }, cancelWaitToken)); + } + + /// + /// Dispose of resources. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private bool _disposed; + + protected virtual void Dispose(bool disposing) + { + if (_disposed) return; + if (disposing) { - if (_disposed) return; - if (disposing) + lock (_listeners) { - lock (_listeners) - { - _listeners?.ForEach(l => l?.Dispose()); - } - _queue?.RequestStop(); - _bus?.Dispose(); + _listeners?.ForEach(l => l?.Dispose()); } - _disposed = true; + + _queue?.RequestStop(); + _bus?.Dispose(); } - /// - /// Applies a message synchronously to the read model while ensuring that the - /// is respected and bypasses both the queue and listeners. This is primarily useful in tests. - /// - /// The message to apply. - public void DirectApply(IMessage message) { DequeueMessage(message); } - public void Handle(Message message) { ((IHandle)_queue).Handle(message); } - public void Handle(IMessage message) { ((IHandle)_queue).Handle(message); } - /// - /// Publishes a message onto the read model's internal queue. - /// This bypasses the Listeners while ensuring that the - /// is respected. All messages will be processed in order from the queue thread. - /// - /// The message to publish. - public void Publish(IMessage message) { ((IPublisher)_queue).Publish(message); } - } -} + + _disposed = true; + } + + /// + /// Applies a message synchronously to the read model while ensuring that the + /// is respected and bypasses both the queue and listeners. This is primarily useful in tests. + /// + /// The message to apply. + public void DirectApply(IMessage message) + { + DequeueMessage(message); + } + + public void Handle(Message message) + { + ((IHandle)_queue).Handle(message); + } + + public void Handle(IMessage message) + { + ((IHandle)_queue).Handle(message); + } + + /// + /// Publishes a message onto the read model's internal queue. + /// This bypasses the Listeners while ensuring that the + /// is respected. All messages will be processed in order from the queue thread. + /// + /// The message to publish. + public void Publish(IMessage message) + { + ((IPublisher)_queue).Publish(message); + } +} \ No newline at end of file From c0833fd05b3680c76b8c5488be4ddb29db9913e6 Mon Sep 17 00:00:00 2001 From: joshkempner Date: Tue, 24 Mar 2026 13:26:28 -0400 Subject: [PATCH 2/2] Reduces threads in MockStreamStoreConnection --- .../EventStore/MockStreamStoreConnection.cs | 87 ++++++++----------- 1 file changed, 36 insertions(+), 51 deletions(-) diff --git a/src/ReactiveDomain.Testing/EventStore/MockStreamStoreConnection.cs b/src/ReactiveDomain.Testing/EventStore/MockStreamStoreConnection.cs index d6e82c49..a150ce6e 100644 --- a/src/ReactiveDomain.Testing/EventStore/MockStreamStoreConnection.cs +++ b/src/ReactiveDomain.Testing/EventStore/MockStreamStoreConnection.cs @@ -1,42 +1,37 @@ using ReactiveDomain.Messaging; using ReactiveDomain.Messaging.Bus; using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reactive; using EventStore.ClientAPI.Exceptions; -using ReactiveDomain.Util; +using ReactiveDomain.Testing.Messaging; // ReSharper disable MemberCanBePrivate.Global namespace ReactiveDomain.Testing.EventStore { public sealed class MockStreamStoreConnection : IStreamStoreConnection { - private object ReaderWriterLock = new object(); - public const string CategoryStreamNamePrefix = @"$ce"; - public const string EventTypeStreamNamePrefix = @"$et"; - public const string AllStreamName = @"$All"; + private readonly object _readerWriterLock = new(); + public const string CategoryStreamNamePrefix = "$ce"; + public const string EventTypeStreamNamePrefix = "$et"; + public const string AllStreamName = "$All"; private readonly Dictionary> _store; - private readonly List _allStream = new List(); - private readonly QueuedHandler _inboundEventHandler; - private readonly IBus _inboundEventBus; + private readonly List _allStream = []; + private readonly AdHocHandler _inboundEventHandler; + private readonly SingleThreadedBus _inboundEventBus; private readonly List _subscriptions; private bool _connected; private bool _disposed; public MockStreamStoreConnection(string name) { - _subscriptions = new List(); + _subscriptions = []; _store = new Dictionary> { { AllStreamName, new List() } }; - _inboundEventBus = new InMemoryBus(nameof(_inboundEventBus), false); + _inboundEventBus = new SingleThreadedBus(); - _inboundEventHandler = new QueuedHandler( - new AdHocHandler(_inboundEventBus.Publish), - nameof(_inboundEventHandler), - false); - _inboundEventHandler.Start(); + _inboundEventHandler = new AdHocHandler(_inboundEventBus.Publish); ConnectionName = name; } @@ -54,8 +49,8 @@ public void Close() _subscriptions.ForEach(s => s?.Dispose()); } - public event EventHandler Connected = (p1, p2) => { }; - public event EventHandler Disconnected = (p1, p2) => { }; + public event EventHandler Connected = (_, _) => { }; + public event EventHandler Disconnected = (_, _) => { }; public WriteResult AppendToStream( string stream, @@ -68,7 +63,7 @@ public WriteResult AppendToStream( if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); if (string.IsNullOrWhiteSpace(stream)) throw new ArgumentNullException(nameof(stream), $"{nameof(stream)} cannot be null or whitespace"); - lock (ReaderWriterLock) + lock (_readerWriterLock) { List eventStream; if (expectedVersion == ExpectedVersion.Any) @@ -85,7 +80,7 @@ public WriteResult AppendToStream( if (streamExists && expectedVersion == ExpectedVersion.NoStream) throw new WrongExpectedVersionException($"Stream {stream} exists, expected no stream"); - if (!streamExists && (expectedVersion == ExpectedVersion.StreamExists)) + if (!streamExists && expectedVersion == ExpectedVersion.StreamExists) throw new WrongExpectedVersionException($"Stream {stream} does not exist, expected stream"); if (!streamExists) @@ -182,7 +177,7 @@ public StreamEventsSlice ReadStreamForward( long count, UserCredentials credentials = null) { - lock (ReaderWriterLock) + lock (_readerWriterLock) { if (!_connected) throw new InvalidOperationException("Not Connected"); if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); @@ -191,8 +186,8 @@ public StreamEventsSlice ReadStreamForward( List stream; lock (_store) { - if (!_store.ContainsKey(streamName)) { return new StreamNotFoundSlice(streamName); } - stream = _store[streamName].ToList(); + if (!_store.TryGetValue(streamName, out var events)) { return new StreamNotFoundSlice(streamName); } + stream = events.ToList(); } return ReadFromStream(streamName, start, count, stream, ReadDirection.Forward); } @@ -203,7 +198,7 @@ public StreamEventsSlice ReadStreamBackward( long count, UserCredentials credentials = null) { - lock (ReaderWriterLock) + lock (_readerWriterLock) { if (!_connected) throw new InvalidOperationException("Not Connected"); if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); @@ -212,8 +207,8 @@ public StreamEventsSlice ReadStreamBackward( List stream; lock (_store) { - if (!_store.ContainsKey(streamName)) { return new StreamNotFoundSlice(streamName); } - stream = _store[streamName].ToList(); + if (!_store.TryGetValue(streamName, out var events)) { return new StreamNotFoundSlice(streamName); } + stream = events.ToList(); } return ReadFromStream(streamName, start, count, stream, ReadDirection.Backward); } @@ -225,7 +220,7 @@ private StreamEventsSlice ReadFromStream( List stream, ReadDirection direction) { - lock (ReaderWriterLock) + lock (_readerWriterLock) { if (!_connected) throw new InvalidOperationException("Not Connected"); if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); @@ -370,9 +365,9 @@ public IDisposable SubscribeToStream( long currentPos = 0; lock (_store) { - if (_store.ContainsKey(stream)) + if (_store.TryGetValue(stream, out var events)) { - currentPos = _store[stream].Count - 1; + currentPos = events.Count - 1; } } @@ -398,12 +393,12 @@ public IDisposable SubscribeToStreamFrom( if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); var start = (lastCheckpoint ?? -1) + 1; - RecordedEvent[] curEvents = { }; + RecordedEvent[] curEvents = []; lock (_store) { - if (_store.ContainsKey(stream)) + if (_store.TryGetValue(stream, out var events)) { - curEvents = _store[stream].Skip((int)start).ToArray(); + curEvents = events.Skip((int)start).ToArray(); } } while (curEvents.Length > 0) @@ -434,18 +429,18 @@ public IDisposable SubscribeToStreamFrom( - public IDisposable SubscribeToAllFrom(Position @from, Action eventAppeared, CatchUpSubscriptionSettings settings = null, + public IDisposable SubscribeToAllFrom(Position from, Action eventAppeared, CatchUpSubscriptionSettings settings = null, Action liveProcessingStarted = null, Action subscriptionDropped = null, UserCredentials credentials = null, bool resolveLinkTos = true) { if (!_connected) throw new InvalidOperationException("Not Connected"); if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); - var current = (int)@from.CommitPosition; - var currentEvents = new RecordedEvent[] { }; + var current = (int)from.CommitPosition; + RecordedEvent[] currentEvents = []; lock (_allStream) { - if (@from == Position.End) + if (from == Position.End) { current = _allStream.Count - 1; } @@ -488,7 +483,7 @@ public IDisposable SubscribeToAll( public void DeleteStream(string stream, long expectedVersion, UserCredentials credentials = null) { - lock (ReaderWriterLock) + lock (_readerWriterLock) { if (!_connected) throw new InvalidOperationException("Not Connected"); if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); @@ -504,21 +499,15 @@ public void DeleteStream(string stream, long expectedVersion, UserCredentials cr } lock (_store) { - if (_store.ContainsKey(stream)) - { - _store.Remove(stream); - } - if (expectedVersion == ExpectedVersion.StreamExists) - { - throw new ArgumentOutOfRangeException(); - } + _store.Remove(stream); + ArgumentOutOfRangeException.ThrowIfEqual(expectedVersion, ExpectedVersion.StreamExists); } } } public void HardDeleteStream(string stream, long expectedVersion, UserCredentials credentials = null) { - lock (ReaderWriterLock) + lock (_readerWriterLock) { if (!_connected) throw new InvalidOperationException("Not Connected"); if (_disposed) throw new ObjectDisposedException(nameof(MockStreamStoreConnection)); @@ -534,10 +523,7 @@ public void HardDeleteStream(string stream, long expectedVersion, UserCredential } lock (_store) { - if (_store.ContainsKey(stream)) - { - _store.Remove(stream); - } + _store.Remove(stream); if (expectedVersion == ExpectedVersion.StreamExists) { throw new ArgumentOutOfRangeException(); @@ -641,7 +627,6 @@ public void Dispose() _disposed = true; Close(); _subscriptions?.ForEach(s => s?.Dispose()); - _inboundEventHandler?.Stop(); } public class EventWritten : IMessage