diff --git a/Ev.ServiceBus.sln b/Ev.ServiceBus.sln index 97903ee..f25be5f 100644 --- a/Ev.ServiceBus.sln +++ b/Ev.ServiceBus.sln @@ -24,6 +24,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{15791C07-7 docs\Ev.ServiceBus.HealthChecks.md = docs\Ev.ServiceBus.HealthChecks.md docs\Ev.ServiceBus.Mvc.md = docs\Ev.ServiceBus.Mvc.md docs\Instrumentation.md = docs\Instrumentation.md + docs\Isolation.md = docs\Isolation.md EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{8EC286EB-A71F-4431-ACA4-E92C79975817}" diff --git a/README.md b/README.md index a6901f4..5728ae9 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ across the whole system (name of the property : `PayloadTypeId`). * [Initial Set up](./docs/SetUp.md) * [How to send messages](./docs/SendMessages.md) * [How to receive messages](./docs/ReceiveMessages.md) +* [Isolation Feature](./docs/Isolation.md) * [Advanced scenarios](./docs/AdvancedScenarios.md) * [Ev.ServiceBus.HealthChecks](./docs/Ev.ServiceBus.HealthChecks.md) * [Instrumentation](./docs/Instrumentation.md) diff --git a/docs/AdvancedScenarios.md b/docs/AdvancedScenarios.md index df9bec7..6ba9aec 100644 --- a/docs/AdvancedScenarios.md +++ b/docs/AdvancedScenarios.md @@ -201,21 +201,3 @@ public class MyMessageSender } } ``` - -## Running service instance in isolation - -By "running in isolation" we mean ability to run an instance of microservice (MS) on local environment, when MS is using real queues and topics to communicate with other MSs -Simple case : App1 sending message to App2, which responds back with another message. -Desired isolation is in the following: If message comes from App1.Local, then the response to it should be processed also only by App1.Local. Same holds true from App1.Cloud - it should process responses that are only coming from cloud instances. - -This is achived by simply proper condifuration (see sample project) -```csharp -services.AddServiceBus( - settings => - { - // Provide a connection string here ! - settings.WithConnection("Endpoint=sb://yourconnection.servicebus.windows.net/;SharedAccessKeyName=yourkeyh;SharedAccessKey=ae6pTuOBAFDH2y7xJJf9BFubZGxXMToN6B9NiVgLnbQ=", new ServiceBusClientOptions()); - settings.UseIsolation = true; - settings.IsolationKey = "your-isolation-key"; - }) -``` \ No newline at end of file diff --git a/docs/Isolation.md b/docs/Isolation.md new file mode 100644 index 0000000..3a4ff66 --- /dev/null +++ b/docs/Isolation.md @@ -0,0 +1,88 @@ +# Isolation feature + +## Concept + +In a microservice ecosystem where those services communicate through messages via Service bus queues or topics, some scenarios become difficult to debug. +For example, sometimes you want to manually send a message to a queue and debug in local how your microservice behaves. That would require you to recreate a service bus namespace with the proper queue configured and link app running in local to it. +Another example is, sometimes you want to send a message to one microservice to see and debug how another microservice behaves down the line. + +The isolation feature allows your local application to connect to the service bus namespace of an up and running environment without disturbing it's stability. +It then allows you to manually send messages with some metadata to that queue and you local application will receive it instead of the app running in the environment. + +## Configuration + +### Default configuration +By default, the isolation feature is disabled. your application will treat every incoming messages. +```csharp +services.AddServiceBus( + settings => + { + // Provide a connection string here ! + settings.WithConnection("Endpoint=sb://yourconnection.servicebus.windows.net/;SharedAccessKeyName=yourkeyh;SharedAccessKey=ae6pTuOBAFDH2y7xJJf9BFubZGxXMToN6B9NiVgLnbQ=", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleAllMessages, null, null); + }) +``` + +### Environment configuration +To use the feature, you need to activate isolation both on your local app and on your environment. + +Applications running in your environment must use the "HandleNonIsolatedMessages" behavior. +You also need to provide a name for your microservice. + +```csharp +services.AddServiceBus( + settings => + { + // Provide a connection string here ! + settings.WithConnection("Endpoint=sb://yourconnection.servicebus.windows.net/;SharedAccessKeyName=yourkeyh;SharedAccessKey=ae6pTuOBAFDH2y7xJJf9BFubZGxXMToN6B9NiVgLnbQ=", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleNonIsolatedMessages, null, "My.Application"); + }) +``` + +### Local configuration +To use the feature, you need to activate isolation both on your local app and on your environment. + +The Application running in local must use the "HandleIsolatedMessage" behavior. +You also need to provide a name for your microservice and an isolation key. +Your local application will only process messages that have the same isolation key as the one you provide. + +```csharp +services.AddServiceBus( + settings => + { + // Provide a connection string here ! + settings.WithConnection("Endpoint=sb://yourconnection.servicebus.windows.net/;SharedAccessKeyName=yourkeyh;SharedAccessKey=ae6pTuOBAFDH2y7xJJf9BFubZGxXMToN6B9NiVgLnbQ=", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleIsolatedMessage, "My.IsolationKey", "My.Application"); + }) +``` + +## Usage + +### Simple case +Now to use the feature, you will need to send a message with the proper metadata to the queue/topic of your environment. + +Let's say you have a microservice `App1` and you want to run that application on local for debugging purposes. +You will need to send a message to the queue/topic of `App1` with the following metadata: +```json +{ + "IsolationKey": "My.IsolationKey", + "ApplicationName": "App1" +} +``` +Your local app will receive the message and process it. + +### Complex case +Now let's say that in your environment you have the following microservices: `App1`, `App2`, `App3`, `App4`, `App5`. +They communicate with each other through messages. +The entry point of your ecosystem is `App1`, but you want to debug `App2` and `App3` locally. + +You will need to send a message to the queue/topic of `App1` with the following metadata: +```json +{ + "IsolationKey": "My.IsolationKey", + "ApplicationName": "App2,App3" +} +``` +The isolation metadata will be transferred to any message sent during the processing of the isolated message. + +Meaning that your local apps will be able to receive subsequent isolated messages allowing you to debug them. diff --git a/samples/Ev.ServiceBus.Samples.Receiver/Program.cs b/samples/Ev.ServiceBus.Samples.Receiver/Program.cs index 6d79d74..0d1f973 100644 --- a/samples/Ev.ServiceBus.Samples.Receiver/Program.cs +++ b/samples/Ev.ServiceBus.Samples.Receiver/Program.cs @@ -1,4 +1,5 @@ using Azure.Messaging.ServiceBus; +using Ev.ServiceBus.Abstractions; using Ev.ServiceBus.AsyncApi; using Ev.ServiceBus.Prometheus; using Ev.ServiceBus.Sample.Contracts; @@ -73,6 +74,12 @@ private static WebApplicationBuilder CreateHostBuilder(string[] args) settings.WithConnection( "Endpoint=sb://yourconnection.servicebus.windows.net/;SharedAccessKeyName=yourkeyh;SharedAccessKey=ae6pTuOBAFDH2y7xJJf9BFubZGxXMToN6B9NiVgLnbQ=", new ServiceBusClientOptions()); + + // + settings.WithIsolation( + IsolationBehavior.HandleIsolatedMessages, + "MyIsolationKey", + "Company.ReceiverApp"); }) // Enables you to execute code whenever execution of a message starts, succeeded or failed .RegisterEventListener() diff --git a/src/Ev.ServiceBus.Abstractions/Configuration/ServiceBusSettings.cs b/src/Ev.ServiceBus.Abstractions/Configuration/ServiceBusSettings.cs index cbacfb6..a490d4a 100644 --- a/src/Ev.ServiceBus.Abstractions/Configuration/ServiceBusSettings.cs +++ b/src/Ev.ServiceBus.Abstractions/Configuration/ServiceBusSettings.cs @@ -23,14 +23,7 @@ public sealed class ServiceBusSettings /// public ConnectionSettings? ConnectionSettings { get; private set; } - /// - /// When true, The application will subscribe to getting messages from topic in isolation mode, using IsolationKey as differentiator - /// - public bool UseIsolation { get; set; } = false; - /// - /// Key that is used to determine if this running instance should receive and complete or abandon a message - /// - public string? IsolationKey { get; set; } = null; + public IsolationSettings IsolationSettings { get; internal set; } = new (IsolationBehavior.HandleAllMessages, null, null); /// /// Sets the default Connection to use for every resource. (this can be overriden on each and every resource you want) @@ -41,4 +34,30 @@ public void WithConnection(string connectionString, ServiceBusClientOptions opti { ConnectionSettings = new ConnectionSettings(connectionString, options); } + + public void WithIsolation(IsolationBehavior behavior, string? isolationKey = null, string? applicationName = null) + { + IsolationSettings = new IsolationSettings(behavior, isolationKey, applicationName); + } +} + +public class IsolationSettings +{ + public IsolationSettings(IsolationBehavior behavior, string? isolationKey, string? applicationName) + { + IsolationBehavior = behavior; + IsolationKey = isolationKey; + ApplicationName = applicationName; + } + + public IsolationBehavior IsolationBehavior { get; private set; } + public string? IsolationKey { get; private set; } + public string? ApplicationName { get; private set; } +} + +public enum IsolationBehavior +{ + HandleAllMessages = 1, + HandleIsolatedMessages, + HandleNonIsolatedMessages } \ No newline at end of file diff --git a/src/Ev.ServiceBus.Abstractions/Exceptions/ConfigurationException.cs b/src/Ev.ServiceBus.Abstractions/Exceptions/ConfigurationException.cs new file mode 100644 index 0000000..a6416c3 --- /dev/null +++ b/src/Ev.ServiceBus.Abstractions/Exceptions/ConfigurationException.cs @@ -0,0 +1,10 @@ +using System; + +namespace Ev.ServiceBus.Abstractions; + +public class ConfigurationException : Exception +{ + public ConfigurationException(string message) : base(message) + { + } +} diff --git a/src/Ev.ServiceBus.Abstractions/MessageHelper.cs b/src/Ev.ServiceBus.Abstractions/MessageHelper.cs index f4ff8a4..94b2630 100644 --- a/src/Ev.ServiceBus.Abstractions/MessageHelper.cs +++ b/src/Ev.ServiceBus.Abstractions/MessageHelper.cs @@ -1,6 +1,7 @@ using System; using Azure.Messaging.ServiceBus; using System.Collections.Generic; +using System.Linq; namespace Ev.ServiceBus.Abstractions; @@ -38,6 +39,22 @@ public static ServiceBusMessage CreateMessage(string contentType, byte[] body, s return message; } + public static string[] GetIsolationApps(this ServiceBusReceivedMessage message) + { + var appsString = TryGetValue(message, UserProperties.IsolationApps); + if (string.IsNullOrEmpty(appsString)) + return []; + return appsString.Split(','); + } + + public static string[] GetIsolationApps(this IReadOnlyDictionary applicationProperties) + { + if (applicationProperties == null) + return []; + applicationProperties.TryGetValue(UserProperties.IsolationApps, out var value); + return value == null ? [] : ((string)value).Split(','); + } + public static string? GetIsolationKey(this ServiceBusReceivedMessage message) { return TryGetValue(message, UserProperties.IsolationKey); @@ -64,6 +81,14 @@ public static ServiceBusMessage SetIsolationKey(this ServiceBusMessage message, return message; } + public static ServiceBusMessage SetIsolationApps(this ServiceBusMessage message, string[] isolationApps) + { + if (isolationApps.Length == 0) + return message; + message.ApplicationProperties[UserProperties.IsolationApps] = string.Join(',', isolationApps); + return message; + } + public static void SetIsolationKey(this IDictionary applicationProperties, string? isolationKey) { if (string.IsNullOrEmpty(isolationKey)) diff --git a/src/Ev.ServiceBus.Abstractions/MessageReception/MessageContext.cs b/src/Ev.ServiceBus.Abstractions/MessageReception/MessageContext.cs index 79f0501..461e12d 100644 --- a/src/Ev.ServiceBus.Abstractions/MessageReception/MessageContext.cs +++ b/src/Ev.ServiceBus.Abstractions/MessageReception/MessageContext.cs @@ -17,6 +17,7 @@ public MessageContext(ProcessSessionMessageEventArgs args, ClientType clientType CancellationToken = args.CancellationToken; PayloadTypeId = Message.GetPayloadTypeId(); IsolationKey = Message.GetIsolationKey(); + IsolationApps = Message.GetIsolationApps(); } public MessageContext(ProcessMessageEventArgs args, ClientType clientType, string resourceId) @@ -28,6 +29,7 @@ public MessageContext(ProcessMessageEventArgs args, ClientType clientType, strin CancellationToken = args.CancellationToken; PayloadTypeId = Message.GetPayloadTypeId(); IsolationKey = Message.GetIsolationKey(); + IsolationApps = Message.GetIsolationApps(); } public ServiceBusReceivedMessage Message { get; } @@ -40,6 +42,7 @@ public MessageContext(ProcessMessageEventArgs args, ClientType clientType, strin public string? PayloadTypeId { get; internal set; } public MessageReceptionRegistration? ReceptionRegistration { get; internal set; } public string? IsolationKey { get; internal set; } + public string[] IsolationApps { get; internal set; } public MessageExecutionContext ReadExecutionContext() { diff --git a/src/Ev.ServiceBus.Abstractions/UserProperties.cs b/src/Ev.ServiceBus.Abstractions/UserProperties.cs index ab699e0..9e74639 100644 --- a/src/Ev.ServiceBus.Abstractions/UserProperties.cs +++ b/src/Ev.ServiceBus.Abstractions/UserProperties.cs @@ -8,4 +8,5 @@ public static class UserProperties public const string PayloadTypeIdProperty = "PayloadTypeId"; public const string MessageTypeProperty = "MessageType"; public const string IsolationKey = "IsolationKey"; + public const string IsolationApps = "IsolationApps"; } \ No newline at end of file diff --git a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs index f304b23..480402a 100644 --- a/src/Ev.ServiceBus/Dispatch/DispatchSender.cs +++ b/src/Ev.ServiceBus/Dispatch/DispatchSender.cs @@ -239,8 +239,6 @@ private ServiceBusMessage CreateMessage( MessageDispatchRegistration registration, Abstractions.Dispatch dispatch) { - var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString(); - var originalIsolationKey = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationKey(); var result = _messagePayloadSerializer.SerializeBody(dispatch.Payload); var message = MessageHelper.CreateMessage(result.ContentType, result.Body, registration.PayloadTypeId); @@ -251,10 +249,20 @@ private ServiceBusMessage CreateMessage( } message.SessionId = dispatch.SessionId; + + var originalCorrelationId = _messageMetadataAccessor.Metadata?.CorrelationId ?? Guid.NewGuid().ToString(); message.CorrelationId = dispatch.CorrelationId ?? originalCorrelationId; - message.SetIsolationKey(originalIsolationKey ?? _serviceBusOptions.Settings.IsolationKey); + + var originalIsolationKey = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationKey(); + message.SetIsolationKey(originalIsolationKey ?? _serviceBusOptions.Settings.IsolationSettings.IsolationKey); + + var originalIsolationApps = _messageMetadataAccessor.Metadata?.ApplicationProperties.GetIsolationApps() ?? []; + message.SetIsolationApps(originalIsolationApps); + if (dispatch.DiagnosticId != null) + { message.SetDiagnosticIdIfIsNot(dispatch.DiagnosticId); + } if (!string.IsNullOrWhiteSpace(dispatch.MessageId)) { message.MessageId = dispatch.MessageId; diff --git a/src/Ev.ServiceBus/Isolation/IsolationService.cs b/src/Ev.ServiceBus/Isolation/IsolationService.cs new file mode 100644 index 0000000..cafce2f --- /dev/null +++ b/src/Ev.ServiceBus/Isolation/IsolationService.cs @@ -0,0 +1,117 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Ev.ServiceBus.Abstractions; +using Ev.ServiceBus.Abstractions.MessageReception; +using Ev.ServiceBus.Management; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Ev.ServiceBus.Isolation; + +public class IsolationService +{ + private readonly IOptions _options; + private readonly ILogger _logger; + private readonly ServiceBusRegistry _registry; + private readonly IMessageMetadataAccessor _messageMetadataAccessor; + private readonly IsolationSettings _isolationSettings; + + public IsolationService( + IOptions options, + ILogger logger, + ServiceBusRegistry registry, + IMessageMetadataAccessor messageMetadataAccessor) + { + _options = options; + _logger = logger; + _registry = registry; + _messageMetadataAccessor = messageMetadataAccessor; + _isolationSettings = options.Value.Settings.IsolationSettings; + } + + public async Task HandleIsolation(MessageContext context) + { + return _isolationSettings.IsolationBehavior switch + { + IsolationBehavior.HandleAllMessages => await HandleAllMessages(context), + IsolationBehavior.HandleIsolatedMessages => await HandleIsolatedMessage(context), + IsolationBehavior.HandleNonIsolatedMessages => await HandleNonIsolatedMessages(context), + _ => throw new ArgumentOutOfRangeException() + }; + } + + private async Task HandleNonIsolatedMessages(MessageContext context) + { + if (context.IsolationApps.Contains(_isolationSettings.ApplicationName) == false) + { + return true; + } + if (string.IsNullOrEmpty(context.IsolationKey)) + { + return true; + } + + _logger.IgnoreMessage("", context.IsolationKey); + + await _messageMetadataAccessor.Metadata!.CompleteMessageAsync(); + + await SendToSourceAsync(context, new ServiceBusMessage(context.Message)); + return false; + } + + private async Task HandleIsolatedMessage(MessageContext context) + { + if (context.IsolationKey == _isolationSettings.IsolationKey + && context.IsolationApps.Contains(_isolationSettings.ApplicationName)) + { + return true; + } + + _logger.IgnoreMessage(_isolationSettings.IsolationKey, context.IsolationKey); + + await _messageMetadataAccessor.Metadata!.CompleteMessageAsync(); + + await SendToSourceAsync(context, new ServiceBusMessage(context.Message)); + return false; + } + + private Task HandleAllMessages(MessageContext context) + { + return Task.FromResult(true); + } + + private async Task SendToSourceAsync( + MessageContext messageContext, + ServiceBusMessage message) + { + var senderInfo = GetSenderResourceId(messageContext); + + // Try to get existing sender + var sender = _registry.TryGetMessageSender(senderInfo.ClientType, senderInfo.ResourceId); + if (sender != null) + { + await sender.SendMessageAsync(message, messageContext.CancellationToken); + return; + } + + // Create a temporary sender if no registered sender exists + var connectionSettings = _options.Value.Settings.ConnectionSettings!; + var client = _registry.CreateOrGetServiceBusClient(connectionSettings)!; + + await using var tempSender = client.CreateSender(senderInfo.ResourceId); + await tempSender.SendMessageAsync(message, messageContext.CancellationToken); + } + + private (ClientType ClientType, string ResourceId) GetSenderResourceId(MessageContext messageContext) + { + return messageContext.ClientType switch + { + ClientType.Subscription => + // For subscriptions, ResourceId is in format "topicName/Subscriptions/subscriptionName" + (ClientType.Topic, messageContext.ResourceId.Split('/')[0]), + _ => (ClientType.Queue, messageContext.ResourceId) + }; + } +} diff --git a/src/Ev.ServiceBus/Reception/Extensions/MessageContextExtensions.cs b/src/Ev.ServiceBus/Reception/Extensions/MessageContextExtensions.cs deleted file mode 100644 index a2b1e82..0000000 --- a/src/Ev.ServiceBus/Reception/Extensions/MessageContextExtensions.cs +++ /dev/null @@ -1,79 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Azure.Messaging.ServiceBus; -using Ev.ServiceBus.Abstractions; -using Ev.ServiceBus.Abstractions.MessageReception; -using Ev.ServiceBus.Management; - -namespace Ev.ServiceBus.Reception.Extensions; - -public static class MessageContextExtensions -{ - public static async Task CompleteAndResendMessageAsync(this MessageContext messageContext, - IMessageMetadataAccessor messageMetadataAccessor, - ServiceBusRegistry registry, - ConnectionSettings connectionSettings) - { - await messageMetadataAccessor.Metadata!.CompleteMessageAsync(); - - await SendToSourceAsync(messageContext, new ServiceBusMessage(messageContext.Message), - registry, connectionSettings, messageContext.CancellationToken); - } - - private static async Task SendToSourceAsync(this MessageContext messageContext, - ServiceBusMessage message, - ServiceBusRegistry registry, - ConnectionSettings connectionSettings, - CancellationToken cancellationToken) - { - if (messageContext == null) throw new ArgumentNullException(nameof(messageContext)); - if (message == null) throw new ArgumentNullException(nameof(message)); - if (registry == null) throw new ArgumentNullException(nameof(registry)); - - switch (messageContext.ClientType) - { - case ClientType.Queue: - { - // Try to get existing sender - var sender = registry.TryGetMessageSender(messageContext.ClientType, messageContext.ResourceId); - if (sender != null) - { - await sender.SendMessageAsync(message, cancellationToken); - return; - } - - // Create a temporary sender if no registered sender exists - var client = registry.CreateOrGetServiceBusClient(connectionSettings) - ?? throw new InvalidOperationException("Failed to create ServiceBusClient"); - - await using var tempSender = client.CreateSender(messageContext.ResourceId); - await tempSender.SendMessageAsync(message, cancellationToken); - break; - } - case ClientType.Subscription: - { - // For subscriptions, ResourceId is in format "topicName/Subscriptions/subscriptionName" - var topicName = messageContext.ResourceId.Split('/')[0]; - - // Try to get existing sender for the topic - var sender = registry.TryGetMessageSender(ClientType.Topic, topicName); - if (sender != null) - { - await sender.SendMessageAsync(message, cancellationToken); - return; - } - - // Create a temporary sender if no registered sender exists - var client = registry.CreateOrGetServiceBusClient(connectionSettings) - ?? throw new InvalidOperationException("Failed to create ServiceBusClient"); - - await using var tempSender = client.CreateSender(topicName); - await tempSender.SendMessageAsync(message, cancellationToken); - break; - } - default: - throw new ArgumentException($"Unsupported client type: {messageContext.ClientType}", nameof(messageContext)); - } - } -} \ No newline at end of file diff --git a/src/Ev.ServiceBus/Reception/MessageReceptionHandler.cs b/src/Ev.ServiceBus/Reception/MessageReceptionHandler.cs index 250ed22..f75e619 100644 --- a/src/Ev.ServiceBus/Reception/MessageReceptionHandler.cs +++ b/src/Ev.ServiceBus/Reception/MessageReceptionHandler.cs @@ -8,11 +8,10 @@ using Ev.ServiceBus.Abstractions.MessageReception; using Ev.ServiceBus.Diagnostics; using Ev.ServiceBus.Exceptions; +using Ev.ServiceBus.Isolation; using Ev.ServiceBus.Management; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Ev.ServiceBus.Reception.Extensions; namespace Ev.ServiceBus.Reception; @@ -24,8 +23,7 @@ public class MessageReceptionHandler private readonly MessageMetadataAccessor _messageMetadataAccessor; private readonly IEnumerable _eventListeners; private readonly IServiceProvider _provider; - private readonly ServiceBusOptions _serviceBusOptions; - private readonly ServiceBusRegistry _registry; + private readonly IsolationService _isolationService; public MessageReceptionHandler( IServiceProvider provider, @@ -33,17 +31,15 @@ public MessageReceptionHandler( ILogger logger, IMessageMetadataAccessor messageMetadataAccessor, IEnumerable eventListeners, - IOptions serviceBusOptions, - ServiceBusRegistry registry) + IsolationService isolationService) { _provider = provider; _messagePayloadSerializer = messagePayloadSerializer; _logger = logger; _messageMetadataAccessor = (MessageMetadataAccessor)messageMetadataAccessor; _eventListeners = eventListeners; - _serviceBusOptions = serviceBusOptions.Value; _callHandlerInfo = GetType().GetMethod(nameof(CallHandler), BindingFlags.NonPublic | BindingFlags.Instance)!; - _registry = registry; + _isolationService = isolationService; } public async Task HandleMessageAsync(MessageContext context) @@ -52,18 +48,10 @@ public async Task HandleMessageAsync(MessageContext context) { _messageMetadataAccessor.SetData(context); - if (_serviceBusOptions.Settings.UseIsolation) + var shouldHandleMessage = await _isolationService.HandleIsolation(context); + if (!shouldHandleMessage) { - var expectedIsolationKey = _serviceBusOptions.Settings.IsolationKey; - - var receivedIsolationKey = context.IsolationKey - ?? string.Empty; - - if (receivedIsolationKey != expectedIsolationKey) - { - await HandleIsolationKeyMismatchAsync(context, expectedIsolationKey!, receivedIsolationKey); - return; - } + return; } var executionStartedArgs = new ExecutionStartedArgs(context); @@ -142,18 +130,6 @@ public async Task HandleMessageAsync(MessageContext context) } } - private async Task HandleIsolationKeyMismatchAsync(MessageContext context, string expectedKey, string receivedKey) - { - _logger.IgnoreMessage(expectedKey, receivedKey); - - var connectionSettings = _serviceBusOptions.Settings.ConnectionSettings; - - await context.CompleteAndResendMessageAsync( - _messageMetadataAccessor, - _registry, - connectionSettings!); - } - private IDisposable? AddLoggingContext(MessageContext context) { var executionContext = context.ReadExecutionContext(); diff --git a/src/Ev.ServiceBus/ServiceBusEngine.cs b/src/Ev.ServiceBus/ServiceBusEngine.cs index dd7e417..7a42eef 100644 --- a/src/Ev.ServiceBus/ServiceBusEngine.cs +++ b/src/Ev.ServiceBus/ServiceBusEngine.cs @@ -51,9 +51,13 @@ public async Task StartAll() _serviceBusEngineLogger.FailedToConnectToServiceBus(ex); } } - if (settings.UseIsolation && settings.IsolationKey == null) + if (settings.IsolationSettings.IsolationBehavior != IsolationBehavior.HandleAllMessages && settings.IsolationSettings.ApplicationName == null) { - throw new Exception("Isolation key must be set when isolation is enabled"); + throw new ConfigurationException($"IsolationSettings.ApplicationName must be set when {nameof(IsolationBehavior)} is {nameof(IsolationBehavior.HandleIsolatedMessages)} or {nameof(IsolationBehavior.HandleNonIsolatedMessages)}"); + } + if (settings.IsolationSettings is { IsolationBehavior: IsolationBehavior.HandleIsolatedMessages, IsolationKey: null }) + { + throw new ConfigurationException($"IsolationSettings.IsolationKey must be set when {nameof(IsolationBehavior)} is {nameof(IsolationBehavior.HandleIsolatedMessages)}"); } BuildSenders(); diff --git a/src/Ev.ServiceBus/ServiceCollectionExtensions.cs b/src/Ev.ServiceBus/ServiceCollectionExtensions.cs index 58dedc1..1c30e39 100644 --- a/src/Ev.ServiceBus/ServiceCollectionExtensions.cs +++ b/src/Ev.ServiceBus/ServiceCollectionExtensions.cs @@ -5,6 +5,7 @@ using Ev.ServiceBus.Abstractions.Listeners; using Ev.ServiceBus.Abstractions.MessageReception; using Ev.ServiceBus.Dispatch; +using Ev.ServiceBus.Isolation; using Ev.ServiceBus.Management; using Ev.ServiceBus.Reception; using Microsoft.Extensions.DependencyInjection; @@ -52,6 +53,7 @@ private static void RegisterBaseServices(IServiceCollection services) RegisterMessageReceptionServices(services); services.TryAddScoped(); + services.TryAddScoped(); } private static void RegisterMessageReceptionServices(IServiceCollection services) diff --git a/tests/Ev.ServiceBus.TestHelpers/SenderMock.cs b/tests/Ev.ServiceBus.TestHelpers/SenderMock.cs index 54c84cf..2712a72 100644 --- a/tests/Ev.ServiceBus.TestHelpers/SenderMock.cs +++ b/tests/Ev.ServiceBus.TestHelpers/SenderMock.cs @@ -1,5 +1,6 @@ using System.Collections.Generic; using System.Threading; +using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Moq; @@ -7,6 +8,8 @@ namespace Ev.ServiceBus.TestHelpers; public class SenderMock { + public List MessagesSent { get; private set; } = new(); + public SenderMock(string queueOrTopicName) { QueueOrTopicName = queueOrTopicName; @@ -14,6 +17,13 @@ public SenderMock(string queueOrTopicName) Mock.SetupGet(o => o.EntityPath).Returns(queueOrTopicName); Mock.Setup(o => o.CreateMessageBatchAsync(It.IsAny())) .ReturnsAsync(ServiceBusModelFactory.ServiceBusMessageBatch(0, new List())); + Mock + .Setup(o => o.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask) + .Callback((ServiceBusMessage message, CancellationToken token) => + { + MessagesSent.Add(message); + }); } public string QueueOrTopicName { get; } diff --git a/tests/Ev.ServiceBus.UnitTests/DispatchConfigurationTest.cs b/tests/Ev.ServiceBus.UnitTests/Configuration/DispatchConfigurationTest.cs similarity index 99% rename from tests/Ev.ServiceBus.UnitTests/DispatchConfigurationTest.cs rename to tests/Ev.ServiceBus.UnitTests/Configuration/DispatchConfigurationTest.cs index 8bfbfa5..35106af 100644 --- a/tests/Ev.ServiceBus.UnitTests/DispatchConfigurationTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Configuration/DispatchConfigurationTest.cs @@ -13,7 +13,7 @@ using Xunit; using Composer = Ev.ServiceBus.UnitTests.Helpers.Composer; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Configuration; public class DispatchConfigurationTest { diff --git a/tests/Ev.ServiceBus.UnitTests/ReceptionConfigurationTest.cs b/tests/Ev.ServiceBus.UnitTests/Configuration/ReceptionConfigurationTest.cs similarity index 99% rename from tests/Ev.ServiceBus.UnitTests/ReceptionConfigurationTest.cs rename to tests/Ev.ServiceBus.UnitTests/Configuration/ReceptionConfigurationTest.cs index 34404a4..c55238f 100644 --- a/tests/Ev.ServiceBus.UnitTests/ReceptionConfigurationTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Configuration/ReceptionConfigurationTest.cs @@ -13,7 +13,7 @@ using Xunit; using Composer = Ev.ServiceBus.UnitTests.Helpers.Composer; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Configuration; public class ReceptionConfigurationTest { diff --git a/tests/Ev.ServiceBus.UnitTests/ServiceBusSettingsTests.cs b/tests/Ev.ServiceBus.UnitTests/Configuration/ServiceBusSettingsTests.cs similarity index 96% rename from tests/Ev.ServiceBus.UnitTests/ServiceBusSettingsTests.cs rename to tests/Ev.ServiceBus.UnitTests/Configuration/ServiceBusSettingsTests.cs index 97cc0be..1e3780e 100644 --- a/tests/Ev.ServiceBus.UnitTests/ServiceBusSettingsTests.cs +++ b/tests/Ev.ServiceBus.UnitTests/Configuration/ServiceBusSettingsTests.cs @@ -7,7 +7,7 @@ using Microsoft.Extensions.Options; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Configuration; public class ServiceBusSettingsTests { diff --git a/tests/Ev.ServiceBus.UnitTests/DeactivatedSenderTest.cs b/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs similarity index 98% rename from tests/Ev.ServiceBus.UnitTests/DeactivatedSenderTest.cs rename to tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs index 9bd7e22..4376447 100644 --- a/tests/Ev.ServiceBus.UnitTests/DeactivatedSenderTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Core/DeactivatedSenderTest.cs @@ -9,7 +9,7 @@ using Microsoft.Extensions.DependencyInjection; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Core; public class DeactivatedSenderTest { diff --git a/tests/Ev.ServiceBus.UnitTests/EventListenerTests.cs b/tests/Ev.ServiceBus.UnitTests/Core/EventListenerTests.cs similarity index 97% rename from tests/Ev.ServiceBus.UnitTests/EventListenerTests.cs rename to tests/Ev.ServiceBus.UnitTests/Core/EventListenerTests.cs index 07dbde0..65d2dcb 100644 --- a/tests/Ev.ServiceBus.UnitTests/EventListenerTests.cs +++ b/tests/Ev.ServiceBus.UnitTests/Core/EventListenerTests.cs @@ -1,204 +1,204 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Ev.ServiceBus.Abstractions; -using Ev.ServiceBus.Exceptions; -using Ev.ServiceBus.UnitTests.Helpers; -using FluentAssertions; -using FluentAssertions.Execution; -using Microsoft.Extensions.DependencyInjection; -using Moq; -using Xunit; - -namespace Ev.ServiceBus.UnitTests; - -public class EventListenerTests -{ - [Fact] - public async Task CanListenToQueueEvents() - { - var mock = new Mock(); - var composer = new Composer(); - - composer.WithAdditionalOptions(builder => - { - builder.RegisterEventListener(); - }); - - composer.WithAdditionalServices(services => - { - services.AddSingleton(mock); - services.RegisterServiceBusReception().FromQueue("testQueue", builder => - { - builder.RegisterReception(); - }); - - }); - await composer.Compose(); - - var clientMock = composer.Provider.GetProcessorMock("testQueue"); - var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); - var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); - - await clientMock.TriggerMessageReception(message, CancellationToken.None); - - mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Queue)), Times.Once); - mock.Verify(o => o.OnExecutionSuccess(It.Is(e => e.ClientType == ClientType.Queue && e.ExecutionDurationMilliseconds > 0)), Times.Once); - mock.Verify(o => o.OnExecutionFailed(It.IsAny()), Times.Never); - } - - [Fact] - public async Task CanListenToQueueEventsWhenThrowingExceptions() - { - var mock = new Mock(); - var composer = new Composer(); - - composer.WithAdditionalOptions(builder => - { - builder.RegisterEventListener(); - }); - - composer.WithAdditionalServices(services => - { - services.AddSingleton(mock); - services.RegisterServiceBusReception().FromQueue("testQueue", builder => - { - builder.RegisterReception(); - }); - - }); - await composer.Compose(); - - var clientMock = composer.Provider.GetProcessorMock("testQueue"); - var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); - var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); - message.MessageId = Guid.NewGuid().ToString(); - - var action = async () => - { - await clientMock.TriggerMessageReception(message, CancellationToken.None); - }; - - using (new AssertionScope()) - { - var exception = await action.Should().ThrowAsync(); - exception.WithInnerException(); - exception.And.ClientType.Should().Be("Queue"); - exception.And.ResourceId.Should().Be("testQueue"); - exception.And.MessageId.Should().Be(message.MessageId); - exception.And.HandlerName.Should().Be("Ev.ServiceBus.UnitTests.Helpers.SubscribedEventThrowingHandler"); - exception.And.SessionId.Should().Be("none"); - exception.And.PayloadTypeId.Should().Be("SubscribedEvent"); - mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Queue)), Times.Once); - mock.Verify(o => o.OnExecutionSuccess(It.IsAny()), Times.Never); - mock.Verify(o => o.OnExecutionFailed(It.Is(e => e.ClientType == ClientType.Queue && e.Exception is ArgumentOutOfRangeException)), Times.Once); - } - } - - [Fact] - public async Task CanListenToSubscriptionEvents() - { - var mock = new Mock(); - var composer = new Composer(); - - composer.WithAdditionalOptions(builder => - { - builder.RegisterEventListener(); - }); - - composer.WithAdditionalServices(services => - { - services.AddSingleton(mock); - services.RegisterServiceBusReception().FromSubscription("testTopic", "testSubscription", builder => - { - builder.RegisterReception(); - }); - - }); - await composer.Compose(); - - var clientMock = composer.Provider.GetProcessorMock("testTopic", "testSubscription"); - var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); - var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); - - await clientMock.TriggerMessageReception(message, CancellationToken.None); - - mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Subscription)), Times.Once); - mock.Verify(o => o.OnExecutionSuccess(It.Is(e => e.ClientType == ClientType.Subscription && e.ExecutionDurationMilliseconds > 0)), Times.Once); - mock.Verify(o => o.OnExecutionFailed(It.IsAny()), Times.Never); - } - - [Fact] - public async Task CanListenToSubscriptionEventsWhenThrowingExceptions() - { - var mock = new Mock(); - var composer = new Composer(); - - composer.WithAdditionalOptions(builder => - { - builder.RegisterEventListener(); - }); - - composer.WithAdditionalServices(services => - { - services.AddSingleton(mock); - services.RegisterServiceBusReception().FromSubscription("testTopic", "testSubscription", builder => - { - builder.RegisterReception(); - }); - - }); - await composer.Compose(); - - var clientMock = composer.Provider.GetProcessorMock("testTopic", "testSubscription"); - var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); - var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); - message.MessageId = Guid.NewGuid().ToString(); - - var action = async () => - { - await clientMock.TriggerMessageReception(message, CancellationToken.None); - }; - - using (new AssertionScope()) - { - var exception = await action.Should().ThrowAsync(); - exception.WithInnerException(); - exception.And.ClientType.Should().Be("Subscription"); - exception.And.ResourceId.Should().Be("testTopic/Subscriptions/testSubscription"); - exception.And.MessageId.Should().Be(message.MessageId); - exception.And.HandlerName.Should().Be("Ev.ServiceBus.UnitTests.Helpers.SubscribedEventThrowingHandler"); - exception.And.SessionId.Should().Be("none"); - exception.And.PayloadTypeId.Should().Be("SubscribedEvent"); - mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Subscription)), Times.Once); - mock.Verify(o => o.OnExecutionSuccess(It.IsAny()), Times.Never); - mock.Verify(o => o.OnExecutionFailed(It.Is(e => e.ClientType == ClientType.Subscription && e.Exception is ArgumentOutOfRangeException)), Times.Once); - } - } - -} - -public class FakeListener : IServiceBusEventListener -{ - public Mock Mock { get; } - - public FakeListener(Mock mock) - { - Mock = mock; - } - - public Task OnExecutionStart(ExecutionStartedArgs args) - { - return Mock.Object.OnExecutionStart(args); - } - - public Task OnExecutionSuccess(ExecutionSucceededArgs args) - { - return Mock.Object.OnExecutionSuccess(args); - } - - public Task OnExecutionFailed(ExecutionFailedArgs args) - { - return Mock.Object.OnExecutionFailed(args); - } +using System; +using System.Threading; +using System.Threading.Tasks; +using Ev.ServiceBus.Abstractions; +using Ev.ServiceBus.Exceptions; +using Ev.ServiceBus.UnitTests.Helpers; +using FluentAssertions; +using FluentAssertions.Execution; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Xunit; + +namespace Ev.ServiceBus.UnitTests.Core; + +public class EventListenerTests +{ + [Fact] + public async Task CanListenToQueueEvents() + { + var mock = new Mock(); + var composer = new Composer(); + + composer.WithAdditionalOptions(builder => + { + builder.RegisterEventListener(); + }); + + composer.WithAdditionalServices(services => + { + services.AddSingleton(mock); + services.RegisterServiceBusReception().FromQueue("testQueue", builder => + { + builder.RegisterReception(); + }); + + }); + await composer.Compose(); + + var clientMock = composer.Provider.GetProcessorMock("testQueue"); + var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); + var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); + + await clientMock.TriggerMessageReception(message, CancellationToken.None); + + mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Queue)), Times.Once); + mock.Verify(o => o.OnExecutionSuccess(It.Is(e => e.ClientType == ClientType.Queue && e.ExecutionDurationMilliseconds > 0)), Times.Once); + mock.Verify(o => o.OnExecutionFailed(It.IsAny()), Times.Never); + } + + [Fact] + public async Task CanListenToQueueEventsWhenThrowingExceptions() + { + var mock = new Mock(); + var composer = new Composer(); + + composer.WithAdditionalOptions(builder => + { + builder.RegisterEventListener(); + }); + + composer.WithAdditionalServices(services => + { + services.AddSingleton(mock); + services.RegisterServiceBusReception().FromQueue("testQueue", builder => + { + builder.RegisterReception(); + }); + + }); + await composer.Compose(); + + var clientMock = composer.Provider.GetProcessorMock("testQueue"); + var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); + var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); + message.MessageId = Guid.NewGuid().ToString(); + + var action = async () => + { + await clientMock.TriggerMessageReception(message, CancellationToken.None); + }; + + using (new AssertionScope()) + { + var exception = await action.Should().ThrowAsync(); + exception.WithInnerException(); + exception.And.ClientType.Should().Be("Queue"); + exception.And.ResourceId.Should().Be("testQueue"); + exception.And.MessageId.Should().Be(message.MessageId); + exception.And.HandlerName.Should().Be("Ev.ServiceBus.UnitTests.Helpers.SubscribedEventThrowingHandler"); + exception.And.SessionId.Should().Be("none"); + exception.And.PayloadTypeId.Should().Be("SubscribedEvent"); + mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Queue)), Times.Once); + mock.Verify(o => o.OnExecutionSuccess(It.IsAny()), Times.Never); + mock.Verify(o => o.OnExecutionFailed(It.Is(e => e.ClientType == ClientType.Queue && e.Exception is ArgumentOutOfRangeException)), Times.Once); + } + } + + [Fact] + public async Task CanListenToSubscriptionEvents() + { + var mock = new Mock(); + var composer = new Composer(); + + composer.WithAdditionalOptions(builder => + { + builder.RegisterEventListener(); + }); + + composer.WithAdditionalServices(services => + { + services.AddSingleton(mock); + services.RegisterServiceBusReception().FromSubscription("testTopic", "testSubscription", builder => + { + builder.RegisterReception(); + }); + + }); + await composer.Compose(); + + var clientMock = composer.Provider.GetProcessorMock("testTopic", "testSubscription"); + var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); + var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); + + await clientMock.TriggerMessageReception(message, CancellationToken.None); + + mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Subscription)), Times.Once); + mock.Verify(o => o.OnExecutionSuccess(It.Is(e => e.ClientType == ClientType.Subscription && e.ExecutionDurationMilliseconds > 0)), Times.Once); + mock.Verify(o => o.OnExecutionFailed(It.IsAny()), Times.Never); + } + + [Fact] + public async Task CanListenToSubscriptionEventsWhenThrowingExceptions() + { + var mock = new Mock(); + var composer = new Composer(); + + composer.WithAdditionalOptions(builder => + { + builder.RegisterEventListener(); + }); + + composer.WithAdditionalServices(services => + { + services.AddSingleton(mock); + services.RegisterServiceBusReception().FromSubscription("testTopic", "testSubscription", builder => + { + builder.RegisterReception(); + }); + + }); + await composer.Compose(); + + var clientMock = composer.Provider.GetProcessorMock("testTopic", "testSubscription"); + var result = composer.Provider.GetRequiredService().SerializeBody(new SubscribedEvent()); + var message = MessageHelper.CreateMessage(result.ContentType, result.Body, nameof(SubscribedEvent)); + message.MessageId = Guid.NewGuid().ToString(); + + var action = async () => + { + await clientMock.TriggerMessageReception(message, CancellationToken.None); + }; + + using (new AssertionScope()) + { + var exception = await action.Should().ThrowAsync(); + exception.WithInnerException(); + exception.And.ClientType.Should().Be("Subscription"); + exception.And.ResourceId.Should().Be("testTopic/Subscriptions/testSubscription"); + exception.And.MessageId.Should().Be(message.MessageId); + exception.And.HandlerName.Should().Be("Ev.ServiceBus.UnitTests.Helpers.SubscribedEventThrowingHandler"); + exception.And.SessionId.Should().Be("none"); + exception.And.PayloadTypeId.Should().Be("SubscribedEvent"); + mock.Verify(o => o.OnExecutionStart(It.Is(e => e.ClientType == ClientType.Subscription)), Times.Once); + mock.Verify(o => o.OnExecutionSuccess(It.IsAny()), Times.Never); + mock.Verify(o => o.OnExecutionFailed(It.Is(e => e.ClientType == ClientType.Subscription && e.Exception is ArgumentOutOfRangeException)), Times.Once); + } + } + +} + +public class FakeListener : IServiceBusEventListener +{ + public Mock Mock { get; } + + public FakeListener(Mock mock) + { + Mock = mock; + } + + public Task OnExecutionStart(ExecutionStartedArgs args) + { + return Mock.Object.OnExecutionStart(args); + } + + public Task OnExecutionSuccess(ExecutionSucceededArgs args) + { + return Mock.Object.OnExecutionSuccess(args); + } + + public Task OnExecutionFailed(ExecutionFailedArgs args) + { + return Mock.Object.OnExecutionFailed(args); + } } \ No newline at end of file diff --git a/tests/Ev.ServiceBus.UnitTests/QueueClientHandlingTest.cs b/tests/Ev.ServiceBus.UnitTests/Core/QueueClientHandlingTest.cs similarity index 99% rename from tests/Ev.ServiceBus.UnitTests/QueueClientHandlingTest.cs rename to tests/Ev.ServiceBus.UnitTests/Core/QueueClientHandlingTest.cs index 52f508f..7b262b6 100644 --- a/tests/Ev.ServiceBus.UnitTests/QueueClientHandlingTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Core/QueueClientHandlingTest.cs @@ -6,7 +6,7 @@ using Moq; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Core; public class QueueClientHandlingTest { diff --git a/tests/Ev.ServiceBus.UnitTests/SubscriptionClientHandlingTest.cs b/tests/Ev.ServiceBus.UnitTests/Core/SubscriptionClientHandlingTest.cs similarity index 97% rename from tests/Ev.ServiceBus.UnitTests/SubscriptionClientHandlingTest.cs rename to tests/Ev.ServiceBus.UnitTests/Core/SubscriptionClientHandlingTest.cs index 103f7d3..403efe6 100644 --- a/tests/Ev.ServiceBus.UnitTests/SubscriptionClientHandlingTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Core/SubscriptionClientHandlingTest.cs @@ -6,7 +6,7 @@ using FluentAssertions; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Core; public class SubscriptionClientHandlingTest { diff --git a/tests/Ev.ServiceBus.UnitTests/TopicClientHandlingTest.cs b/tests/Ev.ServiceBus.UnitTests/Core/TopicClientHandlingTest.cs similarity index 99% rename from tests/Ev.ServiceBus.UnitTests/TopicClientHandlingTest.cs rename to tests/Ev.ServiceBus.UnitTests/Core/TopicClientHandlingTest.cs index eae6b21..c0dd903 100644 --- a/tests/Ev.ServiceBus.UnitTests/TopicClientHandlingTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Core/TopicClientHandlingTest.cs @@ -6,7 +6,7 @@ using Moq; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Core; public class TopicClientHandlingTest { diff --git a/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs b/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs index b65ee7c..ef65051 100644 --- a/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/DispatchTest.cs @@ -560,8 +560,7 @@ public async Task ShouldAssignIsolationKeyWhenInIsolationMode() composer.WithDefaultSettings(settings => { settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); - settings.UseIsolation = true; - settings.IsolationKey = isolationKey; + settings.WithIsolation(IsolationBehavior.HandleIsolatedMessages, isolationKey, "My.Application"); }); SetupComposer(composer); await composer.Compose(); diff --git a/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj b/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj index a2e002a..ba5ac9a 100644 --- a/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj +++ b/tests/Ev.ServiceBus.UnitTests/Ev.ServiceBus.UnitTests.csproj @@ -4,6 +4,7 @@ false net8.0 CS0618 + enable diff --git a/tests/Ev.ServiceBus.UnitTests/IsolationTests.cs b/tests/Ev.ServiceBus.UnitTests/IsolationTests.cs new file mode 100644 index 0000000..999b47d --- /dev/null +++ b/tests/Ev.ServiceBus.UnitTests/IsolationTests.cs @@ -0,0 +1,361 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Ev.ServiceBus.Abstractions; +using Ev.ServiceBus.Reception; +using Ev.ServiceBus.TestHelpers; +using Ev.ServiceBus.UnitTests.Helpers; +using FluentAssertions; +using Microsoft.Extensions.DependencyInjection; +using Xunit; + +namespace Ev.ServiceBus.UnitTests; + +public class IsolationTests +{ + [Theory] + [InlineData(null, null)] + [InlineData(null, "Test.Application")] + [InlineData("AnotherIsolationKey", null)] + [InlineData("AnotherIsolationKey", "Test.Application")] + [InlineData("AnotherIsolationKey", "Another.Application,Test.Application")] + [InlineData("MyIsolationKey", "Another.Application")] + [InlineData("MyIsolationKey", null)] + public async Task When_HandleIsolatedMessage_DoesntHandleAndResendMessagesFromDifferentInstance(string? givenIsolationKey, string? givenIsolationApps) + { + var composer = new Composer(); + var eventStore = new EventStore(); + composer.WithDefaultSettings(settings => + { + settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleIsolatedMessages, "MyIsolationKey", "Test.Application"); + }); + + composer.WithAdditionalServices( + services => + { + services.RegisterServiceBusReception() + .FromSubscription( + "testTopic", + "testSubscription", + builder => + { + builder.RegisterReception() + .CustomizePayloadTypeId("MyEvent"); + }); + + services.AddSingleton(eventStore); + }); + + await composer.Compose(); + var client = composer + .ClientFactory + .GetProcessorMock("testTopic", "testSubscription"); + + var message = TestMessageHelper.CreateEventMessage("myevent", new + { + SomeString = "hello", + SomeNumber = 36 + }); + message.ApplicationProperties[UserProperties.IsolationKey] = givenIsolationKey; + message.ApplicationProperties[UserProperties.IsolationApps] = givenIsolationApps; + + await client.TriggerMessageReception(message, CancellationToken.None); + var @event = eventStore.Events.FirstOrDefault(o => o.HandlerType == typeof(SubscribedPayloadHandler)); + @event.Should().BeNull(); + + var messageSent = composer.ClientFactory.GetSenderMock("testTopic").MessagesSent.SingleOrDefault(); + messageSent.Should().NotBeNull(); + messageSent!.ApplicationProperties[UserProperties.IsolationKey].Should().Be(givenIsolationKey); + messageSent.ApplicationProperties[UserProperties.IsolationApps].Should().Be(givenIsolationApps); + messageSent.ApplicationProperties[UserProperties.PayloadTypeIdProperty].Should().Be("myevent"); + } + + [Theory] + [InlineData(null, null)] + [InlineData(null, "Test.Application")] + [InlineData("AnotherIsolationKey", null)] + [InlineData("AnotherIsolationKey", "Test.Application")] + [InlineData("AnotherIsolationKey", "Another.Application,Test.Application")] + [InlineData("MyIsolationKey", "Another.Application")] + [InlineData("MyIsolationKey", null)] + public async Task When_HandleIsolatedMessage_DoesntHandleAndResendMessagesFromDifferentInstance_ForQueues(string? givenIsolationKey, string? givenIsolationApps) + { + var composer = new Composer(); + var eventStore = new EventStore(); + composer.WithDefaultSettings(settings => + { + settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleIsolatedMessages, "MyIsolationKey", "Test.Application"); + }); + + composer.WithAdditionalServices( + services => + { + services.RegisterServiceBusReception() + .FromQueue( + "testQueue", + builder => + { + builder.RegisterReception() + .CustomizePayloadTypeId("MyEvent"); + }); + + services.RegisterServiceBusDispatch() + .ToQueue( + "testQueue", + builder => + { + builder.RegisterDispatch(); + }); + + services.AddSingleton(eventStore); + }); + + await composer.Compose(); + var client = composer.ClientFactory.GetProcessorMock("testQueue"); + + var message = TestMessageHelper.CreateEventMessage("myevent", new + { + SomeString = "hello", + SomeNumber = 36 + }); + message.ApplicationProperties[UserProperties.IsolationKey] = givenIsolationKey; + message.ApplicationProperties[UserProperties.IsolationApps] = givenIsolationApps; + + await client.TriggerMessageReception(message, CancellationToken.None); + var @event = eventStore.Events.FirstOrDefault(o => o.HandlerType == typeof(SubscribedPayloadHandler)); + @event.Should().BeNull(); + + var messageSent = composer.ClientFactory.GetSenderMock("testQueue").MessagesSent.SingleOrDefault(); + messageSent.Should().NotBeNull(); + messageSent!.ApplicationProperties[UserProperties.IsolationKey].Should().Be(givenIsolationKey); + messageSent.ApplicationProperties[UserProperties.IsolationApps].Should().Be(givenIsolationApps); + messageSent.ApplicationProperties[UserProperties.PayloadTypeIdProperty].Should().Be("myevent"); + } + + [Theory] + [InlineData("MyIsolationKey", "Test.Application")] + [InlineData("MyIsolationKey", "Test.Application,Another.Application")] + public async Task When_HandleIsolatedMessage_HandleIsolatedMessages(string? givenIsolationKey, string? givenIsolationApps) + { + var composer = new Composer(); + var eventStore = new EventStore(); + composer.WithDefaultSettings(settings => + { + settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleIsolatedMessages, "MyIsolationKey", "Test.Application"); + }); + + composer.WithAdditionalServices( + services => + { + services.RegisterServiceBusReception() + .FromSubscription( + "testTopic", + "testSubscription", + builder => + { + builder.RegisterReception() + .CustomizePayloadTypeId("MyEvent"); + }); + + services.AddSingleton(eventStore); + }); + + await composer.Compose(); + var client = composer + .ClientFactory + .GetProcessorMock("testTopic", "testSubscription"); + + var message = TestMessageHelper.CreateEventMessage("myevent", new + { + SomeString = "hello", + SomeNumber = 36 + }); + message.ApplicationProperties[UserProperties.IsolationKey] = givenIsolationKey; + message.ApplicationProperties[UserProperties.IsolationApps] = givenIsolationApps; + + await client.TriggerMessageReception(message, CancellationToken.None); + var @event = eventStore.Events.FirstOrDefault(o => o.HandlerType == typeof(SubscribedPayloadHandler)); + @event.Should().NotBeNull(); + } + + [Theory] + [InlineData("MyIsolationKey", "Test.Application")] + [InlineData("MyIsolationKey", "Test.Application,Another.Application")] + public async Task When_HandleIsolatedMessage_HandleMessagesIsolatedMessages_ForQueues(string? givenIsolationKey, string? givenIsolationApps) + { + var composer = new Composer(); + var eventStore = new EventStore(); + composer.WithDefaultSettings(settings => + { + settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleIsolatedMessages, "MyIsolationKey", "Test.Application"); + }); + + composer.WithAdditionalServices( + services => + { + services.RegisterServiceBusReception() + .FromQueue( + "testQueue", + builder => + { + builder.RegisterReception() + .CustomizePayloadTypeId("MyEvent"); + }); + + services.AddSingleton(eventStore); + }); + + await composer.Compose(); + var client = composer.ClientFactory.GetProcessorMock("testQueue"); + + var message = TestMessageHelper.CreateEventMessage("myevent", new + { + SomeString = "hello", + SomeNumber = 36 + }); + message.ApplicationProperties[UserProperties.IsolationKey] = givenIsolationKey; + message.ApplicationProperties[UserProperties.IsolationApps] = givenIsolationApps; + + await client.TriggerMessageReception(message, CancellationToken.None); + var @event = eventStore.Events.FirstOrDefault(o => o.HandlerType == typeof(SubscribedPayloadHandler)); + @event.Should().NotBeNull(); + } + + [Theory] + [InlineData("AnotherIsolationKey", "Test.Application")] + [InlineData("AnotherIsolationKey", "Another.Application,Test.Application")] + [InlineData("MyIsolationKey", "Test.Application")] + public async Task When_HandleNonIsolatedMessages_DoesntHandleAndResendMessagesFromDifferentInstance( + string? givenIsolationKey, + string? givenIsolationApps) + { + var composer = new Composer(); + var eventStore = new EventStore(); + composer.WithDefaultSettings(settings => + { + settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleNonIsolatedMessages, null, "Test.Application"); + }); + + composer.WithAdditionalServices( + services => + { + services.RegisterServiceBusReception() + .FromSubscription( + "testTopic", + "testSubscription", + builder => + { + builder.RegisterReception() + .CustomizePayloadTypeId("MyEvent"); + }); + + services.AddSingleton(eventStore); + }); + + await composer.Compose(); + var client = composer + .ClientFactory + .GetProcessorMock("testTopic", "testSubscription"); + + var message = TestMessageHelper.CreateEventMessage("myevent", new + { + SomeString = "hello", + SomeNumber = 36 + }); + message.ApplicationProperties[UserProperties.IsolationKey] = givenIsolationKey; + message.ApplicationProperties[UserProperties.IsolationApps] = givenIsolationApps; + + await client.TriggerMessageReception(message, CancellationToken.None); + var @event = eventStore.Events.FirstOrDefault(o => o.HandlerType == typeof(SubscribedPayloadHandler)); + @event.Should().BeNull(); + + var messageSent = composer.ClientFactory.GetSenderMock("testTopic").MessagesSent.SingleOrDefault(); + messageSent.Should().NotBeNull(); + messageSent!.ApplicationProperties[UserProperties.IsolationKey].Should().Be(givenIsolationKey); + messageSent.ApplicationProperties[UserProperties.IsolationApps].Should().Be(givenIsolationApps); + messageSent.ApplicationProperties[UserProperties.PayloadTypeIdProperty].Should().Be("myevent"); + } + + [Theory] + [InlineData(null, null)] + [InlineData(null, "Test.Application")] + [InlineData("AnotherIsolationKey", null)] + [InlineData("MyIsolationKey", "Another.Application")] + [InlineData("MyIsolationKey", null)] + public async Task When_HandleNonIsolatedMessage_HandleNonIsolatedMessages(string? givenIsolationKey, string? givenIsolationApps) + { + var composer = new Composer(); + var eventStore = new EventStore(); + composer.WithDefaultSettings(settings => + { + settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); + settings.WithIsolation(IsolationBehavior.HandleNonIsolatedMessages, null, "Test.Application"); + }); + + composer.WithAdditionalServices( + services => + { + services.RegisterServiceBusReception() + .FromSubscription( + "testTopic", + "testSubscription", + builder => + { + builder.RegisterReception() + .CustomizePayloadTypeId("MyEvent"); + }); + + services.AddSingleton(eventStore); + }); + + await composer.Compose(); + var client = composer + .ClientFactory + .GetProcessorMock("testTopic", "testSubscription"); + + var message = TestMessageHelper.CreateEventMessage("myevent", new + { + SomeString = "hello", + SomeNumber = 36 + }); + message.ApplicationProperties[UserProperties.IsolationKey] = givenIsolationKey; + message.ApplicationProperties[UserProperties.IsolationApps] = givenIsolationApps; + + await client.TriggerMessageReception(message, CancellationToken.None); + var @event = eventStore.Events.FirstOrDefault(o => o.HandlerType == typeof(SubscribedPayloadHandler)); + @event.Should().NotBeNull(); + } + + public class SubscribedPayloadHandler : StoringPayloadHandler + { + public SubscribedPayloadHandler(EventStore store) : base(store) { } + } + + public class FailingEventHandler : IMessageReceptionHandler + { + public Task Handle(SubscribedEvent @event, CancellationToken cancellationToken) + { + throw new ArgumentNullException(); + } + } + + public class CancellingHandler : IMessageReceptionHandler + { + public Task Handle(SubscribedEvent @event, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return Task.CompletedTask; + } + + throw new ArgumentNullException(); + } + } +} diff --git a/tests/Ev.ServiceBus.UnitTests/MessageContextExtensionsTests.cs b/tests/Ev.ServiceBus.UnitTests/MessageContextExtensionsTests.cs deleted file mode 100644 index 53712ad..0000000 --- a/tests/Ev.ServiceBus.UnitTests/MessageContextExtensionsTests.cs +++ /dev/null @@ -1,184 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Azure.Messaging.ServiceBus; -using Ev.ServiceBus.Abstractions; -using Ev.ServiceBus.Abstractions.MessageReception; -using Ev.ServiceBus.Management; -using Ev.ServiceBus.Reception.Extensions; -using Microsoft.Extensions.Options; -using Moq; -using Xunit; - -namespace Ev.ServiceBus.UnitTests -{ - public class MessageContextExtensionsTests - { - [Fact] - public async Task CompleteAndResendMessageAsync_ShouldCompleteAndResendMessageToCorrectQueue() - { - // Arrange - var queueName = "test-queue"; - var receivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(); - var receiverMock = new Mock(); - var args = new ProcessMessageEventArgs( - receivedMessage, - receiverMock.Object, - CancellationToken.None); - - var messageContext = new MessageContext( - args, - ClientType.Queue, - queueName); - - var metadataMock = new Mock(); - metadataMock.Setup(m => m.CompleteMessageAsync()).Returns(Task.CompletedTask); - - var metadataAccessorMock = new Mock(); - metadataAccessorMock.Setup(a => a.Metadata).Returns(metadataMock.Object); - - var senderMock = new Mock(); - senderMock.Setup(s => s.SendMessageAsync(It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); - - // Create a mock for IMessageSender that will work with the registry - var messageSenderMock = new Mock(); - messageSenderMock.Setup(s => s.SendMessageAsync(It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); - messageSenderMock.Setup(s => s.ClientType).Returns(ClientType.Queue); - messageSenderMock.Setup(s => s.Name).Returns(queueName); - - // Create the registry with mocked dependencies - var clientFactoryMock = new Mock(); - var optionsMock = new Mock>(); - optionsMock.Setup(o => o.Value).Returns(new ServiceBusOptions()); - - var registry = new ServiceBusRegistry(clientFactoryMock.Object, optionsMock.Object); - - // Register the IMessageSender mock with the registry - var registerMethod = typeof(ServiceBusRegistry).GetMethod("Register", - System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic, - null, - [typeof(IMessageSender)], - null); - - registerMethod!.Invoke(registry, [messageSenderMock.Object]); - - // Act - await messageContext.CompleteAndResendMessageAsync( - metadataAccessorMock.Object, - registry, - null!); - - // Assert - metadataMock.Verify(m => m.CompleteMessageAsync(), Times.Once); - messageSenderMock.Verify( - s => s.SendMessageAsync( - It.IsAny(), - It.IsAny()), - Times.Once); - } - - [Fact] - public async Task CompleteAndResendMessageAsync_WithSubscription_SendsToCorrectTopic() - { - // Arrange - var topicName = "test-topic"; - var subscriptionPath = $"{topicName}/Subscriptions/test-subscription"; - - var receivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(); - var receiverMock = new Mock(); - var args = new ProcessMessageEventArgs( - receivedMessage, - receiverMock.Object, - CancellationToken.None); - - var messageContext = new MessageContext( - args, - ClientType.Subscription, - subscriptionPath); - - var metadataMock = new Mock(); - metadataMock.Setup(m => m.CompleteMessageAsync()).Returns(Task.CompletedTask); - - var metadataAccessorMock = new Mock(); - metadataAccessorMock.Setup(a => a.Metadata).Returns(metadataMock.Object); - - // Create a mock for IMessageSender for the topic - var messageSenderMock = new Mock(); - messageSenderMock.Setup(s => s.SendMessageAsync(It.IsAny(), It.IsAny())) - .Returns(Task.CompletedTask); - messageSenderMock.Setup(s => s.ClientType).Returns(ClientType.Topic); - messageSenderMock.Setup(s => s.Name).Returns(topicName); - - // Create registry with mocked dependencies - var clientFactoryMock = new Mock(); - var optionsMock = new Mock>(); - optionsMock.Setup(o => o.Value).Returns(new ServiceBusOptions()); - - var registry = new ServiceBusRegistry(clientFactoryMock.Object, optionsMock.Object); - - // Register the IMessageSender mock with the registry - var registerMethod = typeof(ServiceBusRegistry).GetMethod("Register", - System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic, - null, - [typeof(IMessageSender)], - null); - - registerMethod!.Invoke(registry, [messageSenderMock.Object]); - - // Act - await messageContext.CompleteAndResendMessageAsync( - metadataAccessorMock.Object, - registry, - null!); - - // Assert - metadataMock.Verify(m => m.CompleteMessageAsync(), Times.Once); - messageSenderMock.Verify( - s => s.SendMessageAsync( - It.IsAny(), - It.IsAny()), - Times.Once); - } - - [Fact] - public async Task CompleteAndResendMessageAsync_WithInvalidClientType_ThrowsArgumentException() - { - // Arrange - var resourceId = "test-resource"; - var receivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(); - var receiverMock = new Mock(); - var args = new ProcessMessageEventArgs( - receivedMessage, - receiverMock.Object, - CancellationToken.None); - - var messageContext = new MessageContext( - args, - (ClientType)999, - resourceId); - - var metadataMock = new Mock(); - var metadataAccessorMock = new Mock(); - metadataAccessorMock.Setup(a => a.Metadata).Returns(metadataMock.Object); - - // Create registry with mocked dependencies - var clientFactoryMock = new Mock(); - var optionsMock = new Mock>(); - optionsMock.Setup(o => o.Value).Returns(new ServiceBusOptions()); - - var registry = new ServiceBusRegistry(clientFactoryMock.Object, optionsMock.Object); - - // Act & Assert - await Assert.ThrowsAsync(() => - messageContext.CompleteAndResendMessageAsync( - metadataAccessorMock.Object, - registry, - null!)); - - metadataMock.Verify(m => m.CompleteMessageAsync(), Times.Once); - // No need to verify registry operations as the exception occurs first - } - } -} \ No newline at end of file diff --git a/tests/Ev.ServiceBus.UnitTests/MessageHandlingTest.cs b/tests/Ev.ServiceBus.UnitTests/Reception/MessageHandlingTest.cs similarity index 98% rename from tests/Ev.ServiceBus.UnitTests/MessageHandlingTest.cs rename to tests/Ev.ServiceBus.UnitTests/Reception/MessageHandlingTest.cs index 6f4332e..addceb3 100644 --- a/tests/Ev.ServiceBus.UnitTests/MessageHandlingTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Reception/MessageHandlingTest.cs @@ -10,7 +10,7 @@ using Microsoft.Extensions.DependencyInjection; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Reception; public class MessageHandlingTest { diff --git a/tests/Ev.ServiceBus.UnitTests/ReceptionTest.cs b/tests/Ev.ServiceBus.UnitTests/Reception/ReceptionTest.cs similarity index 88% rename from tests/Ev.ServiceBus.UnitTests/ReceptionTest.cs rename to tests/Ev.ServiceBus.UnitTests/Reception/ReceptionTest.cs index 99fe933..98ae4ad 100644 --- a/tests/Ev.ServiceBus.UnitTests/ReceptionTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Reception/ReceptionTest.cs @@ -16,7 +16,7 @@ using Moq; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Reception; public class ReceptionTest { @@ -232,51 +232,6 @@ public async Task MaxAutoRenewDurationIsSet() client.Options.MaxAutoLockRenewalDuration.Should().Be(TimeSpan.FromMinutes(20)); } - [Fact] - public async Task DoesntReceiveMessagesFromDifferentInstance() - { - var isolationKey = "service-isolationKey"; - var composer = new Composer(); - var eventStore = new EventStore(); - composer.WithDefaultSettings(settings => - { - settings.WithConnection("Endpoint=testConnectionString;", new ServiceBusClientOptions()); - settings.UseIsolation = true; - settings.IsolationKey = isolationKey; - }); - - composer.WithAdditionalServices( - services => - { - services.RegisterServiceBusReception() - .FromSubscription( - "testTopic", - "testSubscription", - builder => - { - builder.RegisterReception() - .CustomizePayloadTypeId("MyEvent"); - }); - - services.AddSingleton(eventStore); - }); - - await composer.Compose(); - var client = composer - .ClientFactory - .GetProcessorMock("testTopic", "testSubscription"); - - var message = TestMessageHelper.CreateEventMessage("myevent", new - { - SomeString = "hello", - SomeNumber = 36 - }); - - await client.TriggerMessageReception(message, CancellationToken.None); - var @event = eventStore.Events.FirstOrDefault(o => o.HandlerType == typeof(SubscribedPayloadHandler)); - @event.Should().BeNull(); - } - private async Task InitSimpleTest() { var eventStore = new EventStore(); diff --git a/tests/Ev.ServiceBus.UnitTests/SessionHandlingTest.cs b/tests/Ev.ServiceBus.UnitTests/Reception/SessionHandlingTest.cs similarity index 98% rename from tests/Ev.ServiceBus.UnitTests/SessionHandlingTest.cs rename to tests/Ev.ServiceBus.UnitTests/Reception/SessionHandlingTest.cs index 5de369a..2cfc4dc 100644 --- a/tests/Ev.ServiceBus.UnitTests/SessionHandlingTest.cs +++ b/tests/Ev.ServiceBus.UnitTests/Reception/SessionHandlingTest.cs @@ -9,7 +9,7 @@ using Microsoft.Extensions.Hosting; using Xunit; -namespace Ev.ServiceBus.UnitTests; +namespace Ev.ServiceBus.UnitTests.Reception; public class SessionHandlingTest {