Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,14 @@ await using TemporaryTopic topic = ...
IEnumerable<ServiceBusReceivedMessage> messages =
await topic.MessagesOn("<subscription-name>")

// Get subset messages currently on the topic subscription.
// Filter for a subset messages currently on the topic subscription.
.Where(msg => msg.ApplicationProperties.ContainsKey("<my-key>"))
.Where(msg => msg.ContentType == "application/json")

// Project the messages to another type.
// `ServiceBusReceivedMessage` becomes `MyOrder` in the resulting collection.
.Select(msg => msg.Body.ToObjectFromJson<MyOrder>())

// Get messages only from the dead-letter sub-queue.
.FromDeadLetter()

Expand Down Expand Up @@ -255,10 +259,14 @@ await using TemporaryQueue queue = ...
IEnumerable<ServiceBusReceivedMessage> messages =
await queue.Messages

// Get subset messages currently on the queue.
// Filter for a subset messages currently on the queue.
.Where(msg => msg.ApplicationProperties.ContainsKey("<my-key>"))
.Where(msg => msg.ContentType == "application/json")

// Project the messages to another type.
// `ServiceBusReceivedMessage` becomes `MyOrder` in the resulting collection.
.Select(msg => msg.Body.ToObjectFromJson<MyOrder>())

// Get messages only from the dead-letter sub-queue.
.FromDeadLetter()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public class ServiceBusMessageFilter
private readonly string _entityName, _subscriptionName;
private readonly ServiceBusClient _client;
private readonly Collection<Func<ServiceBusReceivedMessage, bool>> _predicates = [];

private bool _fromDeadLetter;
private int _maxMessages = 100;

Expand All @@ -42,6 +41,22 @@ internal ServiceBusMessageFilter(string entityName, string subscriptionName, Ser
_client = client;
}

/// <summary>
/// Projects the filter to a new <see cref="ServiceBusMessageFilter{TMessageBody}"/> instance
/// that selects a subset of messages based on the given <paramref name="selection"/> function.
/// </summary>
/// <typeparam name="TMessageBody">The type of the message body to project to.</typeparam>
/// <param name="selection">The function to project the message body from the received message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="selection"/> is <c>null</c>.</exception>
public ServiceBusMessageFilter<TMessageBody> Select<TMessageBody>(Func<ServiceBusReceivedMessage, TMessageBody> selection)
{
ArgumentNullException.ThrowIfNull(selection);

return string.IsNullOrWhiteSpace(_subscriptionName)
? new ServiceBusMessageFilter<TMessageBody>(_entityName, _client, selection)
: new ServiceBusMessageFilter<TMessageBody>(_entityName, _subscriptionName, _client, selection);
}

/// <summary>
/// Configures the filter to only select a subset of messages that matches the given <paramref name="predicate"/>.
/// Multiple calls gets aggregated.
Expand Down Expand Up @@ -100,6 +115,8 @@ public Task<bool> AnyAsync()
/// </param>
public async Task<bool> AnyAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var messages = await ToListAsync(cancellationToken).ConfigureAwait(false);
return messages.Count > 0;
}
Expand Down Expand Up @@ -134,6 +151,7 @@ public Task<IReadOnlyList<ServiceBusReceivedMessage>> ToListAsync()
/// </param>
public async Task<IReadOnlyList<ServiceBusReceivedMessage>> ToListAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
var options = new ServiceBusReceiverOptions
{
SubQueue = _fromDeadLetter ? SubQueue.DeadLetter : SubQueue.None
Expand All @@ -154,4 +172,65 @@ await receiver.PeekMessagesAsync(_maxMessages, cancellationToken: cancellationTo
}
}
}

/// <summary>
/// Represents a configurable filter instance that selects a subset of <see cref="ServiceBusReceivedMessage"/>s on an Azure Service Bus queue or topic subscription
/// (a.k.a. 'spy test fixture').
/// </summary>
/// <typeparam name="TMessageBody">The type to which each received Azure Service Bus message is projected for filtering and selection.</typeparam>
public class ServiceBusMessageFilter<TMessageBody> : ServiceBusMessageFilter
{
private readonly Func<ServiceBusReceivedMessage, TMessageBody> _selection;
private readonly Collection<Func<TMessageBody, bool>> _predicates = [];

internal ServiceBusMessageFilter(
string entityName,
ServiceBusClient client,
Func<ServiceBusReceivedMessage, TMessageBody> selection)
: base(entityName, client)
{
_selection = selection;
}

internal ServiceBusMessageFilter(
string entityName,
string subscriptionName,
ServiceBusClient client,
Func<ServiceBusReceivedMessage, TMessageBody> selection)
: base(entityName, subscriptionName, client)
{
_selection = selection;
}

/// <summary>
/// Configures the filter to only select a subset of messages that matches the given <paramref name="predicate"/>.
/// Multiple calls gets aggregated.
/// </summary>
/// <param name="predicate">The custom predicate to match a message.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="predicate"/> is <c>null</c>.</exception>
public ServiceBusMessageFilter Where(Func<TMessageBody, bool> predicate)
{
ArgumentNullException.ThrowIfNull(predicate);

_predicates.Add(predicate);
return this;
}

/// <summary>
/// Creates a <see cref="List{T}"/> from a filtered <see cref="ServiceBusReceivedMessage"/> collection
/// </summary>
/// <remarks>
/// Deferred messages are also included as messages are peeked.
/// </remarks>
/// <param name="cancellationToken">
/// The optional <see cref="CancellationToken" /> instance to signal the request to cancel the operation.
/// </param>
public new async Task<IReadOnlyList<TMessageBody>> ToListAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var messages = await base.ToListAsync(cancellationToken).ConfigureAwait(false);
return messages.Select(_selection).Where(msg => _predicates.All(predicate => predicate(msg))).ToList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Arcus.Testing.Tests.Integration.Messaging.Configuration;
using Azure.Identity;
Expand All @@ -11,6 +12,7 @@
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Sdk;
using static Arcus.Testing.Tests.Integration.Messaging.Fixture.ServiceBusTestContext;

namespace Arcus.Testing.Tests.Integration.Messaging.Fixture
{
Expand Down Expand Up @@ -200,9 +202,10 @@ public async Task<ServiceBusMessage> WhenMessageSentAsync(string entityName, Ser

public ServiceBusMessage WhenMessageUnsent()
{
var message = new ServiceBusMessage(Bogus.Random.Bytes(10))
var pizza = Pizza.Generate();
var message = new ServiceBusMessage(JsonSerializer.SerializeToUtf8Bytes(pizza))
{
MessageId = $"test-{Bogus.Random.Guid()}"
MessageId = pizza.Id,
};

return message;
Expand Down Expand Up @@ -266,28 +269,6 @@ public async Task ShouldNotHaveTopicSubscriptionRuleAsync(string topicName, stri
Assert.False(await _adminClient.RuleExistsAsync(topicName, subscriptionName, ruleName), $"Azure Service Bus topic subscription '{subscriptionName}' should not have a rule '{ruleName}', but it has");
}

/// <summary>
/// Verifies that the message is left alone on the Azure Service bus entity.
/// </summary>
public async Task ShouldLeaveMessageAsync(string entityName, ServiceBusMessage message)
{
await ShouldLeaveMessageAsync(entityName, subscriptionName: null, message);
}

/// <summary>
/// Verifies that the message is left alone on the Azure Service bus entity.
/// </summary>
public async Task ShouldLeaveMessageAsync(string entityName, string subscriptionName, ServiceBusMessage message)
{
await Poll.UntilAvailableAsync<XunitException>(async () =>
{
await using ServiceBusReceiver receiver = CreateReceiver(entityName, subscriptionName);

IEnumerable<ServiceBusReceivedMessage> messages = await receiver.PeekMessagesAsync(100);
Assert.True(messages.Any(actual => actual.MessageId == message.MessageId), $"Azure Service bus '{entityName}' should have message '{message.MessageId}' still available on the bus, but it is not");
});
}

/// <summary>
/// Verifies that the message is dead-lettered on the Azure Service bus entity.
/// </summary>
Expand Down Expand Up @@ -337,6 +318,26 @@ private ServiceBusReceiver CreateReceiver(string entityName, string subscription
: _messagingClient.CreateReceiver(entityName, subscriptionName, options);
}

internal sealed class Pizza
{
public string Id { get; set; }
public string Name { get; set; }
public string Description { get; set; }
public decimal Price { get; set; }
public decimal Balance { get; set; }

public static Pizza Generate()
{
return new Faker<Pizza>()
.RuleFor(p => p.Id, f => $"test-{f.Random.Guid()}")
.RuleFor(p => p.Name, f => f.Commerce.ProductName())
.RuleFor(p => p.Description, f => f.Lorem.Sentence())
.RuleFor(p => p.Price, f => f.Random.Decimal(10, 100))
.RuleFor(p => p.Balance, f => f.Random.Decimal(0, 100))
.Generate();
}
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources asynchronously.
/// </summary>
Expand Down Expand Up @@ -384,4 +385,88 @@ public async ValueTask DisposeAsync()
disposables.Add(_messagingClient);
}
}

internal static class ServiceBusMessageFilterTestExtensions
{
internal static ServiceBusMessageFilter<Pizza> SelectPizza(this ServiceBusMessageFilter filter)
{
return filter.Select(msg => msg.Body.ToObjectFromJson<Pizza>());
}

/// <summary>
/// Verifies that the message is completed on the Azure Service bus entity.
/// </summary>
internal static Task ShouldCompletedMessageAsync(this TemporaryTopic topic, string subscriptionName, ServiceBusMessage message)
{
return ShouldCompletedMessageAsync(topic.MessagesOn(subscriptionName), topic.Name, message);
}

/// <summary>
/// Verifies that the message is completed on the Azure Service bus entity.
/// </summary>
internal static Task ShouldCompletedMessageAsync(this TemporaryQueue queue, ServiceBusMessage message)
{
return ShouldCompletedMessageAsync(queue.Messages, queue.Name, message);
}

private static async Task ShouldCompletedMessageAsync(ServiceBusMessageFilter filter, string entityName, ServiceBusMessage message)
{
var messagesUntyped = await filter.ToListAsync(TestContext.Current.CancellationToken);
Assert.False(messagesUntyped.Any(actual => actual.MessageId == message.MessageId), $"Azure Service bus '{entityName}' should have completed message '{message.MessageId}', but it can still be found on the queue");

var messagesTyped = await filter.SelectPizza().ToListAsync(TestContext.Current.CancellationToken);
Assert.False(messagesTyped.Any(actual => actual.Id == message.MessageId), $"Azure Service bus '{entityName}' should have completed message '{message.MessageId}', but it can still be found on the queue");

var deadLetteredMessages = await filter.FromDeadLetter().ToListAsync(TestContext.Current.CancellationToken);
Assert.False(deadLetteredMessages.Any(actual => actual.MessageId == message.MessageId), $"Azure Service bus '{entityName}' should have completed message '{message.MessageId}', but it can still be found on the dead-lettered queue");
}

/// <summary>
/// Verifies that the message is left alone on the Azure Service bus entity.
/// </summary>
public static Task ShouldLeaveMessageAsync(this TemporaryQueue queue, ServiceBusMessage message)
{
return ShouldLeaveMessageAsync(queue.Messages, queue.Name, message);
}

/// <summary>
/// Verifies that the message is left alone on the Azure Service bus entity.
/// </summary>
public static Task ShouldLeaveMessageAsync(this TemporaryTopic topic, string subscriptionName, ServiceBusMessage message)
{
return ShouldLeaveMessageAsync(topic.MessagesOn(subscriptionName), topic.Name, message);
}

private static Task ShouldLeaveMessageAsync(ServiceBusMessageFilter filter, string entityName, ServiceBusMessage message)
{
return Poll.UntilAvailableAsync<XunitException>(async () =>
{
Assert.True(
await filter.Where(actual => actual.MessageId == message.MessageId).AnyAsync(TestContext.Current.CancellationToken),
$"Azure Service bus '{entityName}' should have message '{message.MessageId}' still available on the bus, but it is not");
});
}

/// <summary>
/// Verifies that the message is dead-lettered on the Azure Service bus entity.
/// </summary>
public static Task ShouldDeadLetteredMessageAsync(this TemporaryQueue queue, ServiceBusMessage message)
{
return ShouldDeadLetteredMessageAsync(queue.Messages, queue.Name, message);
}

/// <summary>
/// Verifies that the message is dead-lettered on the Azure Service bus entity.
/// </summary>
public static Task ShouldDeadLetteredMessageAsync(this TemporaryTopic topic, string subscriptionName, ServiceBusMessage message)
{
return ShouldDeadLetteredMessageAsync(topic.MessagesOn(subscriptionName), topic.Name, message);
}

private static async Task ShouldDeadLetteredMessageAsync(ServiceBusMessageFilter filter, string entityName, ServiceBusMessage message)
{
IEnumerable<ServiceBusReceivedMessage> messages = await filter.FromDeadLetter().ToListAsync(TestContext.Current.CancellationToken);
Assert.True(messages.Any(actual => actual.MessageId == message.MessageId), $"Azure Service bus '{entityName}' should have dead-lettered message '{message.MessageId}', but can't find it in the dead-letter sub-queue");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task CreateTempQueue_OnExistingQueue_SucceedsByLeavingAfterLifetime

// Assert
await serviceBus.ShouldHaveQueueAsync(queueName);
await serviceBus.ShouldLeaveMessageAsync(queueName, messageBefore);
await temp.ShouldLeaveMessageAsync(messageBefore);

ServiceBusMessage messageAfter = await serviceBus.WhenMessageSentAsync(queueName);

Expand Down Expand Up @@ -76,11 +76,8 @@ public async Task CreateTempQueueWithDeadLetterOnSetup_OnExistingQueueWithMessag

// Assert
await serviceBus.ShouldHaveQueueAsync(queueName);
await serviceBus.ShouldDeadLetteredMessageAsync(queueName, messageDeadLetterBefore);
await serviceBus.ShouldCompletedMessageAsync(queueName, messageCompleteBefore);

Assert.True(await temp.Messages.FromDeadLetter().Where(msg => msg.MessageId == messageDeadLetterBefore.MessageId).AnyAsync(TestContext.Current.CancellationToken), $"temp queue should have found dead-lettered message '{messageDeadLetterBefore.MessageId}'");
Assert.False(await temp.Messages.FromDeadLetter().Where(msg => msg.MessageId == messageCompleteBefore.MessageId).AnyAsync(TestContext.Current.CancellationToken), $"temp queue should not have found completed message '{messageCompleteBefore.MessageId}'");
await temp.ShouldDeadLetteredMessageAsync(messageDeadLetterBefore);
await temp.ShouldCompletedMessageAsync(messageCompleteBefore);

ServiceBusMessage messageAfter = await serviceBus.WhenMessageSentAsync(queueName);

Expand Down Expand Up @@ -108,11 +105,8 @@ public async Task CreateTempQueueWithCompleteOnSetup_OnExistingQueueWithMessage_

// Assert
await serviceBus.ShouldHaveQueueAsync(queueName);
await serviceBus.ShouldCompletedMessageAsync(queueName, messageCompleteBefore);
await serviceBus.ShouldDeadLetteredMessageAsync(queueName, messageDeadLetterBefore);

Assert.Empty(await temp.Messages.Where(msg => msg.MessageId == messageCompleteBefore.MessageId));
Assert.Single(await temp.Messages.FromDeadLetter().Take(10).Where(msg => msg.MessageId == messageDeadLetterBefore.MessageId));
await temp.ShouldCompletedMessageAsync(messageCompleteBefore);
await temp.ShouldDeadLetteredMessageAsync(messageDeadLetterBefore);
}

[Fact]
Expand All @@ -136,8 +130,7 @@ public async Task CreateTempQueueWithCompleteOnTeardown_OnExistingQueueWithMessa

// Assert
await serviceBus.ShouldHaveQueueAsync(queueName);
await serviceBus.ShouldLeaveMessageAsync(queueName, messageCompleteBefore1);
Assert.Single(await temp.Messages.Where(msg => msg.MessageId == messageCompleteBefore1.MessageId));
await temp.ShouldLeaveMessageAsync(messageCompleteBefore1);

ServiceBusMessage messageCompleteAfter = serviceBus.WhenMessageUnsent(),
messageDeadLetterAfter = serviceBus.WhenMessageUnsent();
Expand Down
Loading
Loading