Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using ReactiveDomain.Messaging;
using ReactiveDomain.Messaging.Bus;
using ReactiveDomain.Testing;
Expand All @@ -10,20 +11,20 @@ namespace ReactiveDomain.Foundation.Tests;
public class when_using_read_model_base_with_reader :
ReadModelBase,
IHandle<when_using_read_model_base_with_reader.ReadModelTestEvent>,
IClassFixture<StreamStoreConnectionFixture> {
IClassFixture<StreamStoreConnectionFixture>
{
private static IStreamStoreConnection _conn;
private static readonly IEventSerializer Serializer =
new JsonMessageSerializer();
private static readonly IStreamNameBuilder Namer =
new PrefixedCamelCaseStreamNameBuilder(nameof(when_using_read_model_base));
private static readonly JsonMessageSerializer Serializer = new();
private static readonly PrefixedCamelCaseStreamNameBuilder Namer = new(nameof(when_using_read_model_base));

private readonly string _stream1;
private readonly string _stream2;


public when_using_read_model_base_with_reader(StreamStoreConnectionFixture fixture)
: base(nameof(when_using_read_model_base),
new ConfiguredConnection(fixture.Connection, Namer, Serializer)) {
new ConfiguredConnection(fixture.Connection, Namer, Serializer))
{
_conn = fixture.Connection;
_conn.Connect();

Expand All @@ -40,48 +41,68 @@ public when_using_read_model_base_with_reader(StreamStoreConnectionFixture fixtu
_conn.TryConfirmStream(Namer.GenerateForCategory(typeof(TestAggregate)), 20);
}

private void AppendEvents(
private static void AppendEvents(
int numEventsToBeSent,
IStreamStoreConnection conn,
string streamName,
int value) {
for (int evtNumber = 0; evtNumber < numEventsToBeSent; evtNumber++) {
int value)
{
for (int evtNumber = 0; evtNumber < numEventsToBeSent; evtNumber++)
{
var evt = new ReadModelTestEvent(evtNumber, value);
conn.AppendToStream(streamName, ExpectedVersion.Any, null, Serializer.Serialize(evt));
}
}

[Fact]
public void can_start_streams_by_aggregate() {
public void can_start_streams_by_aggregate()
{
var aggId = Guid.NewGuid();
var s1 = Namer.GenerateForAggregate(typeof(TestAggregate), aggId);
AppendEvents(1, _conn, s1, 7);
Start<TestAggregate>(aggId);
AssertEx.IsOrBecomesTrue(() => Count == 1, 1000, msg: $"Expected 1 got {Count}");
AssertEx.IsOrBecomesTrue(() => Sum == 7);
}

[Fact]
public void can_start_streams_by_aggregate_category() {
public async Task can_start_streams_async_by_aggregate() {
var aggId = Guid.NewGuid();
var s1 = Namer.GenerateForAggregate(typeof(TestAggregate), aggId);
AppendEvents(1, _conn, s1, 7);
StartAsync<TestAggregate>(aggId);
await IsLive;
Assert.Equal(1, Count);
Assert.Equal(7, Sum);
}

[Fact]
public async Task can_start_streams_async_by_aggregate_category() {
var s1 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid());
AppendEvents(1, _conn, s1, 7);
var s2 = Namer.GenerateForAggregate(typeof(ReadModelTestCategoryAggregate), Guid.NewGuid());
AppendEvents(1, _conn, s2, 5);
Start<ReadModelTestCategoryAggregate>(null, true);
StartAsync<ReadModelTestCategoryAggregate>();

AssertEx.IsOrBecomesTrue(() => Count == 2, 1000, msg: $"Expected 2 got {Count}");
AssertEx.IsOrBecomesTrue(() => Sum == 12);
await IsLive;
Assert.Equal(2, Count);
Assert.Equal(12, Sum);
}

[Fact]
public void can_read_one_stream() {
public void can_read_one_stream()
{
Start(_stream1);
AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}");
AssertEx.IsOrBecomesTrue(() => Sum == 20);
//confirm checkpoints
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
Assert.Equal(9, GetCheckpoint()[0].Item2);
}

[Fact]
public void can_read_two_streams() {
public void can_read_two_streams()
{
Start(_stream1);
Start(_stream2);
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
Expand All @@ -92,24 +113,36 @@ public void can_read_two_streams() {
Assert.Equal(_stream2, GetCheckpoint()[1].Item1);
Assert.Equal(9, GetCheckpoint()[1].Item2);
}

[Fact]
public void can_wait_for_one_stream_to_go_live() {
Start(_stream1, null, true);
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
AssertEx.IsOrBecomesTrue(() => Sum == 20, 100);
public async Task is_live_task_returns_immediately_when_no_streams_are_played_back_async()
{
await IsLive;
Assert.Equal(0, Count);
}

[Fact]
public void can_wait_for_two_streams_to_go_live() {
Start(_stream1, null, true);
AssertEx.IsOrBecomesTrue(() => Count == 10, 100, msg: $"Expected 10 got {Count}");
AssertEx.IsOrBecomesTrue(() => Sum == 20, 100);
public async Task can_await_one_stream_going_live()
{
StartAsync(_stream1);
await IsLive;
Assert.Equal(10, Count);
Assert.Equal(20, Sum);
}

Start(_stream2, null, true);
AssertEx.IsOrBecomesTrue(() => Count == 20, 100, msg: $"Expected 20 got {Count}");
AssertEx.IsOrBecomesTrue(() => Sum == 50, 100);
[Fact]
public async Task can_await_two_streams_going_live()
{
StartAsync(_stream1);
StartAsync(_stream2);
await IsLive;
Assert.Equal(20, Count);
Assert.Equal(50, Sum);
}

[Fact]
public void can_listen_to_one_stream() {
public void can_listen_to_one_stream()
{
Start(_stream1);
AssertEx.IsOrBecomesTrue(() => Count == 10, 1000, msg: $"Expected 10 got {Count}");
AssertEx.IsOrBecomesTrue(() => Sum == 20);
Expand All @@ -119,11 +152,14 @@ public void can_listen_to_one_stream() {
AssertEx.IsOrBecomesTrue(() => Sum == 70);
//confirm checkpoints
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
Assert.Equal(19, GetCheckpoint()[0].Item2); Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
Assert.Equal(19, GetCheckpoint()[0].Item2);
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
Assert.Equal(19, GetCheckpoint()[0].Item2);
}

[Fact]
public void can_listen_to_two_streams() {
public void can_listen_to_two_streams()
{
Start(_stream1);
Start(_stream2);
AssertEx.IsOrBecomesTrue(() => Count == 20, 1000, msg: $"Expected 20 got {Count}");
Expand All @@ -139,10 +175,12 @@ public void can_listen_to_two_streams() {
Assert.Equal(_stream2, GetCheckpoint()[1].Item1);
Assert.Equal(19, GetCheckpoint()[1].Item2);
}

[Fact]
public void can_use_checkpoint_on_one_stream() {
public void can_use_checkpoint_on_one_stream()
{
//restore state
var checkPoint = 8L;//Zero based, ignore the first 9 events
var checkPoint = 8L; //Zero based, ignore the first 9 events
Count = 9;
Sum = 18;
//start at the checkpoint
Expand All @@ -158,11 +196,13 @@ public void can_use_checkpoint_on_one_stream() {
Assert.Equal(_stream1, GetCheckpoint()[0].Item1);
Assert.Equal(19, GetCheckpoint()[0].Item2);
}

[Fact]
public void can_use_checkpoint_on_two_streams() {
public void can_use_checkpoint_on_two_streams()
{
//restore state
var checkPoint1 = 8L;//Zero based, ignore the first 9 events
var checkPoint2 = 5L;//Zero based, ignore the first 6 events
var checkPoint1 = 8L; //Zero based, ignore the first 9 events
var checkPoint2 = 5L; //Zero based, ignore the first 6 events
Count = (9) + (6);
Sum = (9 * 2) + (6 * 3);
Start(_stream1, checkPoint1);
Expand All @@ -181,8 +221,10 @@ public void can_use_checkpoint_on_two_streams() {
Assert.Equal(_stream2, GetCheckpoint()[1].Item1);
Assert.Equal(19, GetCheckpoint()[1].Item2);
}

[Fact]
public void can_listen_to_the_same_stream_twice() {
public void can_listen_to_the_same_stream_twice()
{
Assert.Equal(0, Count);
//weird but true
//n.b. Don't do this on purpose
Expand All @@ -199,10 +241,14 @@ public void can_listen_to_the_same_stream_twice() {

public long Sum { get; private set; }
public long Count { get; private set; }
void IHandle<ReadModelTestEvent>.Handle(ReadModelTestEvent @event) {

void IHandle<ReadModelTestEvent>.Handle(ReadModelTestEvent @event)
{
Sum += @event.Value;
Count++;
}

public record ReadModelTestEvent(int Number, int Value) : Event;

public class ReadModelTestCategoryAggregate : EventDrivenStateMachine;
}
Loading
Loading