Skip to content

Commit 1df4867

Browse files
author
Felipe Mattioli
committed
FEAT: Adding new changes
1 parent 08a95b3 commit 1df4867

5 files changed

Lines changed: 41 additions & 14 deletions

File tree

src/AzureServiceBusFlow/Abstractions/ICommandProducer.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,23 @@ namespace AzureServiceBusFlow.Abstractions;
44

55
public interface ICommandProducer<in TCommand> where TCommand : class, IServiceBusMessage
66
{
7+
/// <summary>
8+
/// Produces the command without additional options.
9+
/// </summary>
710
Task ProduceCommandAsync(TCommand command, CancellationToken cancellationToken);
811

12+
/// <summary>
13+
/// Produces the command using explicit message options such as delay or application properties.
14+
/// </summary>
915
Task ProduceCommandAsync(TCommand command, MessageOptions messageOptions, CancellationToken cancellationToken);
16+
17+
/// <summary>
18+
/// Produces the command with a set of application properties that will be attached to the message.
19+
/// </summary>
20+
Task ProduceCommandAsync(TCommand command, IDictionary<string, object> applicationProperties, CancellationToken cancellationToken);
21+
22+
/// <summary>
23+
/// Produces the command with a delivery delay before it becomes available for processing.
24+
/// </summary>
25+
Task ProduceCommandAsync(TCommand command, TimeSpan delay, CancellationToken cancellationToken);
1026
}

src/AzureServiceBusFlow/Abstractions/IServiceBusProducer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ Task ProduceAsync(
4747
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
4848
Task ProduceAsync(
4949
TMessage message,
50-
IDictionary<string, string> applicationProperties,
50+
IDictionary<string, object> applicationProperties,
5151
CancellationToken cancellationToken);
5252
}
5353
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
namespace AzureServiceBusFlow.Models
22
{
3-
public record MessageOptions(TimeSpan? Delay, IDictionary<string, string>? ApplicationProperties);
3+
public record MessageOptions(TimeSpan? Delay, IDictionary<string, object>? ApplicationProperties);
44
}

src/AzureServiceBusFlow/Producers/CommandProducer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public Task ProduceCommandAsync(TCommand command, TimeSpan delay, CancellationTo
3535
/// <summary>
3636
/// Produces a command with application properties.
3737
/// </summary>
38-
public Task ProduceCommandAsync(TCommand command, IDictionary<string, string> applicationProperties, CancellationToken cancellationToken)
38+
public Task ProduceCommandAsync(TCommand command, IDictionary<string, object> applicationProperties, CancellationToken cancellationToken)
3939
{
4040
return _producer.ProduceAsync(command, applicationProperties, cancellationToken);
4141
}

src/AzureServiceBusFlow/Producers/ServiceBusProducer.cs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,29 @@ public ServiceBusProducer(AzureServiceBusConfiguration azureServiceBusConfigurat
1515
{
1616
var client = new ServiceBusClient(azureServiceBusConfiguration.ConnectionString);
1717
_sender = client.CreateSender(queueOrTopicName);
18+
1819
_logger = logger;
1920
}
2021

2122
public async Task ProduceAsync(TMessage message, CancellationToken cancellationToken)
2223
{
23-
await ProduceAsync(message, producerOptions: null, cancellationToken);
24+
var json = JsonConvert.SerializeObject(message);
25+
var serviceBusMessage = new ServiceBusMessage(json)
26+
{
27+
Subject = message.RoutingKey,
28+
ApplicationProperties =
29+
{
30+
{ "MessageType", message.GetType().FullName },
31+
{ "CreatedAt", message.CreatedDate.ToString("O") }
32+
}
33+
};
34+
35+
await _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
36+
37+
_logger.LogInformation("Message {MessageType} published with successfully!", message.GetType().Name);
2438
}
2539

26-
public async Task ProduceAsync(TMessage message, MessageOptions? producerOptions, CancellationToken cancellationToken)
40+
public Task ProduceAsync(TMessage message, MessageOptions producerOptions, CancellationToken cancellationToken)
2741
{
2842
var json = JsonConvert.SerializeObject(message);
2943
var serviceBusMessage = new ServiceBusMessage(json)
@@ -38,29 +52,26 @@ public async Task ProduceAsync(TMessage message, MessageOptions? producerOptions
3852

3953
if (producerOptions?.ApplicationProperties is not null)
4054
{
41-
foreach (var kvp in producerOptions.ApplicationProperties)
42-
{
43-
if (!serviceBusMessage.ApplicationProperties.ContainsKey(kvp.Key))
44-
serviceBusMessage.ApplicationProperties.Add(kvp.Key, kvp.Value);
45-
}
55+
producerOptions?.ApplicationProperties?
56+
.Where(kvp => !serviceBusMessage.ApplicationProperties.ContainsKey(kvp.Key))
57+
.ToList()
58+
.ForEach(kvp => serviceBusMessage.ApplicationProperties.Add(kvp.Key, kvp.Value));
4659
}
4760

4861
if (producerOptions?.Delay is not null)
4962
{
5063
serviceBusMessage.ScheduledEnqueueTime = DateTimeOffset.UtcNow.Add(producerOptions.Delay.Value);
5164
}
5265

53-
await _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
54-
55-
_logger.LogInformation("Message {MessageType} published successfully!", message.GetType().Name);
66+
return _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
5667
}
5768

5869
public Task ProduceAsync(TMessage message, TimeSpan delay, CancellationToken cancellationToken)
5970
{
6071
return ProduceAsync(message, new MessageOptions(delay, null), cancellationToken);
6172
}
6273

63-
public Task ProduceAsync(TMessage message, IDictionary<string, string> applicationProperties, CancellationToken cancellationToken)
74+
public Task ProduceAsync(TMessage message, IDictionary<string, object> applicationProperties, CancellationToken cancellationToken)
6475
{
6576
return ProduceAsync(message, new MessageOptions(null, applicationProperties), cancellationToken);
6677
}

0 commit comments

Comments
 (0)