diff --git a/SimpleRabbit.NetCore/Service/BasicRabbitService.cs b/SimpleRabbit.NetCore/Service/BasicRabbitService.cs
index 8e90b7d..ee12d01 100644
--- a/SimpleRabbit.NetCore/Service/BasicRabbitService.cs
+++ b/SimpleRabbit.NetCore/Service/BasicRabbitService.cs
@@ -3,7 +3,9 @@
using System.Linq;
using System.Security.Authentication;
using System.Threading;
+using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
namespace SimpleRabbit.NetCore
{
@@ -58,26 +60,87 @@ protected ConnectionFactory Factory
}
}
private string ClientName =>
- _config?.Name ??
+ _config?.Name ??
Environment.GetEnvironmentVariable("COMPUTERNAME") ??
Environment.GetEnvironmentVariable("HOSTNAME");
private readonly RabbitConfiguration _config;
+ private readonly ILogger _baseLogger;
protected BasicRabbitService(RabbitConfiguration config)
{
_config = config;
}
+ protected BasicRabbitService(RabbitConfiguration config, ILogger logger) : this(config)
+ {
+ _baseLogger = logger;
+ }
+
private IConnection _connection;
///
/// ClientName is used only for human reference from RabbitMQ UI.
///
- protected IConnection Connection => _connection ?? (_connection = Factory.CreateConnection(_hostnames, ClientName));
+ protected IConnection Connection
+ {
+ get
+ {
+ if (_connection == null)
+ {
+ _connection = Factory.CreateConnection(_hostnames, ClientName);
+
+ if (_connection is IAutorecoveringConnection autorecovering)
+ {
+ autorecovering.RecoverySucceeded += OnConnectionRecoverySucceeded;
+ autorecovering.ConnectionRecoveryError += OnConnectionRecoveryError;
+ }
+
+ _connection.ConnectionShutdown += OnConnectionShutdown;
+ }
+
+ return _connection;
+ }
+ }
private IModel _channel;
protected IModel Channel => _channel ?? (_channel = Connection.CreateModel());
+ private void OnConnectionRecoverySucceeded(object sender, EventArgs e)
+ {
+ _baseLogger?.LogInformation("RabbitMQ connection recovered successfully, invalidating channel");
+ lock (_lock)
+ {
+ // Invalidate the stale channel so it gets recreated on next access.
+ // The connection itself is still valid (it just recovered).
+ try
+ {
+ _channel?.Dispose();
+ }
+ catch
+ {
+ // Channel may already be disposed after recovery
+ }
+ _channel = null;
+ }
+ OnRecovered();
+ }
+
+ private void OnConnectionRecoveryError(object sender, ConnectionRecoveryErrorEventArgs e)
+ {
+ _baseLogger?.LogError(e.Exception, "RabbitMQ connection recovery failed");
+ }
+
+ private void OnConnectionShutdown(object sender, ShutdownEventArgs e)
+ {
+ _baseLogger?.LogWarning("RabbitMQ connection shutdown: {Reason}", e.ReplyText);
+ }
+
+ ///
+ /// Called when the connection has been automatically recovered.
+ /// Override to re-register consumers or perform other post-recovery actions.
+ ///
+ protected virtual void OnRecovered() { }
+
public IBasicProperties GetBasicProperties()
{
lock (_lock)
@@ -102,9 +165,24 @@ public void ClearConnection()
}
finally
{
- _connection?.Close();
- _connection?.Dispose();
- _connection = null;
+ try
+ {
+ if (_connection != null)
+ {
+ if (_connection is IAutorecoveringConnection autorecovering)
+ {
+ autorecovering.RecoverySucceeded -= OnConnectionRecoverySucceeded;
+ autorecovering.ConnectionRecoveryError -= OnConnectionRecoveryError;
+ }
+ _connection.ConnectionShutdown -= OnConnectionShutdown;
+ }
+ }
+ finally
+ {
+ _connection?.Close();
+ _connection?.Dispose();
+ _connection = null;
+ }
}
}
diff --git a/SimpleRabbit.NetCore/Service/QueueService.cs b/SimpleRabbit.NetCore/Service/QueueService.cs
index dfe8a15..efcb3c7 100644
--- a/SimpleRabbit.NetCore/Service/QueueService.cs
+++ b/SimpleRabbit.NetCore/Service/QueueService.cs
@@ -4,6 +4,7 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Collections.Concurrent;
+using System.Threading;
using System.Threading.Tasks;
namespace SimpleRabbit.NetCore
@@ -21,17 +22,20 @@ public class QueueService : BasicRabbitService, IQueueService
private const int DefaultRetryInterval = 15;
private readonly ILogger _logger;
+ private readonly object _restartLock = new object();
private readonly Timer _timer;
private QueueConfiguration _queueServiceParams;
private IMessageHandler _handler;
private TimeSpan RetryInterval => TimeSpan.FromSeconds(_queueServiceParams?.RetryIntervalInSeconds ?? DefaultRetryInterval);
-
+
private int _retryCount;
+ private int _restarting;
+ private volatile bool _stopping;
private ConcurrentBag _toBeNackedMessages = new ConcurrentBag();
- public QueueService(RabbitConfiguration options, ILogger logger) : base(options)
+ public QueueService(RabbitConfiguration options, ILogger logger) : base(options, logger)
{
_logger = logger;
@@ -71,6 +75,7 @@ public void Start(QueueConfiguration subscriberConfiguration, IMessageHandler ha
private void Start()
{
Stop();
+ _stopping = false;
if (_handler == null)
{
throw new ArgumentNullException(nameof(_handler), $"No handler provided for {_queueServiceParams.ConsumerTag} => {_queueServiceParams.QueueName}");
@@ -80,9 +85,13 @@ private void Start()
{
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += ReceiveEventAsync;
+ consumer.ConsumerCancelled += OnConsumerCancelled;
+ consumer.Shutdown += OnConsumerShutdown;
Channel.BasicQos(0, _queueServiceParams.PrefetchCount ?? 1, false);
Channel.BasicConsume(_queueServiceParams.QueueName, false, _queueServiceParams.DisplayName ?? _queueServiceParams.ConsumerTag, consumer);
+
+ _logger.LogInformation($"Consumer started on queue {_queueServiceParams.QueueName}");
}
catch (Exception e)
{
@@ -91,6 +100,36 @@ private void Start()
}
}
+ private Task OnConsumerCancelled(object sender, ConsumerEventArgs e)
+ {
+ if (_stopping) return Task.CompletedTask;
+ _logger.LogWarning($"Consumer cancelled by broker on queue {_queueServiceParams.QueueName}, tags: {string.Join(", ", e.ConsumerTags)}. Scheduling restart.");
+ RestartIn(RetryInterval);
+ return Task.CompletedTask;
+ }
+
+ private Task OnConsumerShutdown(object sender, ShutdownEventArgs e)
+ {
+ if (_stopping) return Task.CompletedTask;
+ _logger.LogWarning($"Consumer shutdown on queue {_queueServiceParams.QueueName}: {e.ReplyText}. Scheduling restart.");
+ RestartIn(RetryInterval);
+ return Task.CompletedTask;
+ }
+
+ ///
+ /// Called by BasicRabbitService when the connection has been automatically recovered.
+ /// Re-registers the consumer since the old channel/consumer is stale after recovery.
+ ///
+ protected override void OnRecovered()
+ {
+ if (_stopping) return;
+ _logger.LogInformation($"Connection recovered, restarting consumer on queue {_queueServiceParams?.QueueName}");
+ if (_queueServiceParams != null && _handler != null)
+ {
+ RestartIn(TimeSpan.FromSeconds(1));
+ }
+ }
+
private async Task ReceiveEventAsync(object sender, BasicDeliverEventArgs args)
{
var channel = (sender as AsyncEventingBasicConsumer)?.Model;
@@ -158,7 +197,17 @@ private void OnError(object sender, BasicDeliverEventArgs message)
default:
{
RestartIn(RetryInterval);
- channel.BasicNack(message.DeliveryTag, false, true);
+ try
+ {
+ if (!channel.IsClosed)
+ {
+ channel.BasicNack(message.DeliveryTag, false, true);
+ }
+ }
+ catch (Exception nackEx)
+ {
+ _logger.LogWarning(nackEx, $"Failed to nack message {message.DeliveryTag} on queue {_queueServiceParams.QueueName} (channel may be closed)");
+ }
return;
}
}
@@ -172,50 +221,71 @@ private void OnError(object sender, BasicDeliverEventArgs message)
private void RestartIn(TimeSpan waitInterval)
{
- if (_timer.Enabled)
+ // Use atomic compare-exchange to ensure only one thread enters the restart path.
+ if (Interlocked.CompareExchange(ref _restarting, 1, 0) != 0)
{
- // another message has already triggered an error.
+ // Another thread is already handling the restart.
return;
-
}
- if (_queueServiceParams.OnErrorAction == QueueConfiguration.ErrorAction.RestartConnection)
+ try
{
- try
- {
- //take note of blocking if clearing connection here
- //https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/341
- // attempt to stop the event consumption.
- if (Channel != null && !Channel.IsClosed)
- Channel?.BasicCancel(_queueServiceParams.ConsumerTag);
- }
- catch
+ if (_queueServiceParams.OnErrorAction == QueueConfiguration.ErrorAction.RestartConnection)
{
- // ignored
+ try
+ {
+ //take note of blocking if clearing connection here
+ //https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/341
+ // attempt to stop the event consumption.
+ if (Channel != null && !Channel.IsClosed)
+ Channel?.BasicCancel(_queueServiceParams.DisplayName ?? _queueServiceParams.ConsumerTag);
+ }
+ catch
+ {
+ // ignored
+ }
}
- }
- _retryCount++;
- var interval = waitInterval.TotalSeconds * (_queueServiceParams.AutoBackOff ? _retryCount : 1) % MaxRetryInterval;
+ _retryCount++;
+ var interval = waitInterval.TotalSeconds * (_queueServiceParams.AutoBackOff ? _retryCount : 1) % MaxRetryInterval;
- _timer.Interval = interval * 1000; // seconds
- _logger.LogInformation($" -> restarting in {interval} seconds ({_retryCount}).");
- _timer.Start();
+ _timer.Interval = interval * 1000; // seconds
+ _logger.LogInformation($" -> restarting in {interval} seconds ({_retryCount}).");
+ _timer.Start();
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, $"Error scheduling restart for queue {_queueServiceParams.QueueName}");
+ // Reset the flag so a future attempt can try again
+ Interlocked.Exchange(ref _restarting, 0);
+ }
}
private void TimerActivation()
{
+ // Reset the restart flag so future errors can schedule a new restart.
+ Interlocked.Exchange(ref _restarting, 0);
+
switch (_queueServiceParams.OnErrorAction)
{
case QueueConfiguration.ErrorAction.NackOnException:
{
- foreach (var message in _toBeNackedMessages)
+ try
{
- if (Channel != null && !Channel.IsClosed)
+ foreach (var message in _toBeNackedMessages)
{
- Channel.BasicNack(message, false, true);
+ if (Channel != null && !Channel.IsClosed)
+ {
+ Channel.BasicNack(message, false, true);
+ }
}
-
+ }
+ catch (Exception e)
+ {
+ _logger.LogWarning(e, $"Failed to nack queued messages on {_queueServiceParams.QueueName}, scheduling restart");
+ _toBeNackedMessages.Clear();
+ RestartIn(RetryInterval);
+ return;
}
_toBeNackedMessages.Clear();
return;
@@ -231,7 +301,16 @@ private void TimerActivation()
public void Stop()
{
+ _stopping = true;
+ Interlocked.Exchange(ref _restarting, 0);
+ _timer.Stop();
Close();
}
+
+ protected override void Cleanup()
+ {
+ _timer?.Stop();
+ _timer?.Dispose();
+ }
}
}