diff --git a/SimpleRabbit.NetCore/Service/BasicRabbitService.cs b/SimpleRabbit.NetCore/Service/BasicRabbitService.cs index 8e90b7d..ee12d01 100644 --- a/SimpleRabbit.NetCore/Service/BasicRabbitService.cs +++ b/SimpleRabbit.NetCore/Service/BasicRabbitService.cs @@ -3,7 +3,9 @@ using System.Linq; using System.Security.Authentication; using System.Threading; +using Microsoft.Extensions.Logging; using RabbitMQ.Client; +using RabbitMQ.Client.Events; namespace SimpleRabbit.NetCore { @@ -58,26 +60,87 @@ protected ConnectionFactory Factory } } private string ClientName => - _config?.Name ?? + _config?.Name ?? Environment.GetEnvironmentVariable("COMPUTERNAME") ?? Environment.GetEnvironmentVariable("HOSTNAME"); private readonly RabbitConfiguration _config; + private readonly ILogger _baseLogger; protected BasicRabbitService(RabbitConfiguration config) { _config = config; } + protected BasicRabbitService(RabbitConfiguration config, ILogger logger) : this(config) + { + _baseLogger = logger; + } + private IConnection _connection; /// /// ClientName is used only for human reference from RabbitMQ UI. /// - protected IConnection Connection => _connection ?? (_connection = Factory.CreateConnection(_hostnames, ClientName)); + protected IConnection Connection + { + get + { + if (_connection == null) + { + _connection = Factory.CreateConnection(_hostnames, ClientName); + + if (_connection is IAutorecoveringConnection autorecovering) + { + autorecovering.RecoverySucceeded += OnConnectionRecoverySucceeded; + autorecovering.ConnectionRecoveryError += OnConnectionRecoveryError; + } + + _connection.ConnectionShutdown += OnConnectionShutdown; + } + + return _connection; + } + } private IModel _channel; protected IModel Channel => _channel ?? (_channel = Connection.CreateModel()); + private void OnConnectionRecoverySucceeded(object sender, EventArgs e) + { + _baseLogger?.LogInformation("RabbitMQ connection recovered successfully, invalidating channel"); + lock (_lock) + { + // Invalidate the stale channel so it gets recreated on next access. + // The connection itself is still valid (it just recovered). + try + { + _channel?.Dispose(); + } + catch + { + // Channel may already be disposed after recovery + } + _channel = null; + } + OnRecovered(); + } + + private void OnConnectionRecoveryError(object sender, ConnectionRecoveryErrorEventArgs e) + { + _baseLogger?.LogError(e.Exception, "RabbitMQ connection recovery failed"); + } + + private void OnConnectionShutdown(object sender, ShutdownEventArgs e) + { + _baseLogger?.LogWarning("RabbitMQ connection shutdown: {Reason}", e.ReplyText); + } + + /// + /// Called when the connection has been automatically recovered. + /// Override to re-register consumers or perform other post-recovery actions. + /// + protected virtual void OnRecovered() { } + public IBasicProperties GetBasicProperties() { lock (_lock) @@ -102,9 +165,24 @@ public void ClearConnection() } finally { - _connection?.Close(); - _connection?.Dispose(); - _connection = null; + try + { + if (_connection != null) + { + if (_connection is IAutorecoveringConnection autorecovering) + { + autorecovering.RecoverySucceeded -= OnConnectionRecoverySucceeded; + autorecovering.ConnectionRecoveryError -= OnConnectionRecoveryError; + } + _connection.ConnectionShutdown -= OnConnectionShutdown; + } + } + finally + { + _connection?.Close(); + _connection?.Dispose(); + _connection = null; + } } } diff --git a/SimpleRabbit.NetCore/Service/QueueService.cs b/SimpleRabbit.NetCore/Service/QueueService.cs index dfe8a15..efcb3c7 100644 --- a/SimpleRabbit.NetCore/Service/QueueService.cs +++ b/SimpleRabbit.NetCore/Service/QueueService.cs @@ -4,6 +4,7 @@ using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Collections.Concurrent; +using System.Threading; using System.Threading.Tasks; namespace SimpleRabbit.NetCore @@ -21,17 +22,20 @@ public class QueueService : BasicRabbitService, IQueueService private const int DefaultRetryInterval = 15; private readonly ILogger _logger; + private readonly object _restartLock = new object(); private readonly Timer _timer; private QueueConfiguration _queueServiceParams; private IMessageHandler _handler; private TimeSpan RetryInterval => TimeSpan.FromSeconds(_queueServiceParams?.RetryIntervalInSeconds ?? DefaultRetryInterval); - + private int _retryCount; + private int _restarting; + private volatile bool _stopping; private ConcurrentBag _toBeNackedMessages = new ConcurrentBag(); - public QueueService(RabbitConfiguration options, ILogger logger) : base(options) + public QueueService(RabbitConfiguration options, ILogger logger) : base(options, logger) { _logger = logger; @@ -71,6 +75,7 @@ public void Start(QueueConfiguration subscriberConfiguration, IMessageHandler ha private void Start() { Stop(); + _stopping = false; if (_handler == null) { throw new ArgumentNullException(nameof(_handler), $"No handler provided for {_queueServiceParams.ConsumerTag} => {_queueServiceParams.QueueName}"); @@ -80,9 +85,13 @@ private void Start() { var consumer = new AsyncEventingBasicConsumer(Channel); consumer.Received += ReceiveEventAsync; + consumer.ConsumerCancelled += OnConsumerCancelled; + consumer.Shutdown += OnConsumerShutdown; Channel.BasicQos(0, _queueServiceParams.PrefetchCount ?? 1, false); Channel.BasicConsume(_queueServiceParams.QueueName, false, _queueServiceParams.DisplayName ?? _queueServiceParams.ConsumerTag, consumer); + + _logger.LogInformation($"Consumer started on queue {_queueServiceParams.QueueName}"); } catch (Exception e) { @@ -91,6 +100,36 @@ private void Start() } } + private Task OnConsumerCancelled(object sender, ConsumerEventArgs e) + { + if (_stopping) return Task.CompletedTask; + _logger.LogWarning($"Consumer cancelled by broker on queue {_queueServiceParams.QueueName}, tags: {string.Join(", ", e.ConsumerTags)}. Scheduling restart."); + RestartIn(RetryInterval); + return Task.CompletedTask; + } + + private Task OnConsumerShutdown(object sender, ShutdownEventArgs e) + { + if (_stopping) return Task.CompletedTask; + _logger.LogWarning($"Consumer shutdown on queue {_queueServiceParams.QueueName}: {e.ReplyText}. Scheduling restart."); + RestartIn(RetryInterval); + return Task.CompletedTask; + } + + /// + /// Called by BasicRabbitService when the connection has been automatically recovered. + /// Re-registers the consumer since the old channel/consumer is stale after recovery. + /// + protected override void OnRecovered() + { + if (_stopping) return; + _logger.LogInformation($"Connection recovered, restarting consumer on queue {_queueServiceParams?.QueueName}"); + if (_queueServiceParams != null && _handler != null) + { + RestartIn(TimeSpan.FromSeconds(1)); + } + } + private async Task ReceiveEventAsync(object sender, BasicDeliverEventArgs args) { var channel = (sender as AsyncEventingBasicConsumer)?.Model; @@ -158,7 +197,17 @@ private void OnError(object sender, BasicDeliverEventArgs message) default: { RestartIn(RetryInterval); - channel.BasicNack(message.DeliveryTag, false, true); + try + { + if (!channel.IsClosed) + { + channel.BasicNack(message.DeliveryTag, false, true); + } + } + catch (Exception nackEx) + { + _logger.LogWarning(nackEx, $"Failed to nack message {message.DeliveryTag} on queue {_queueServiceParams.QueueName} (channel may be closed)"); + } return; } } @@ -172,50 +221,71 @@ private void OnError(object sender, BasicDeliverEventArgs message) private void RestartIn(TimeSpan waitInterval) { - if (_timer.Enabled) + // Use atomic compare-exchange to ensure only one thread enters the restart path. + if (Interlocked.CompareExchange(ref _restarting, 1, 0) != 0) { - // another message has already triggered an error. + // Another thread is already handling the restart. return; - } - if (_queueServiceParams.OnErrorAction == QueueConfiguration.ErrorAction.RestartConnection) + try { - try - { - //take note of blocking if clearing connection here - //https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/341 - // attempt to stop the event consumption. - if (Channel != null && !Channel.IsClosed) - Channel?.BasicCancel(_queueServiceParams.ConsumerTag); - } - catch + if (_queueServiceParams.OnErrorAction == QueueConfiguration.ErrorAction.RestartConnection) { - // ignored + try + { + //take note of blocking if clearing connection here + //https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/341 + // attempt to stop the event consumption. + if (Channel != null && !Channel.IsClosed) + Channel?.BasicCancel(_queueServiceParams.DisplayName ?? _queueServiceParams.ConsumerTag); + } + catch + { + // ignored + } } - } - _retryCount++; - var interval = waitInterval.TotalSeconds * (_queueServiceParams.AutoBackOff ? _retryCount : 1) % MaxRetryInterval; + _retryCount++; + var interval = waitInterval.TotalSeconds * (_queueServiceParams.AutoBackOff ? _retryCount : 1) % MaxRetryInterval; - _timer.Interval = interval * 1000; // seconds - _logger.LogInformation($" -> restarting in {interval} seconds ({_retryCount})."); - _timer.Start(); + _timer.Interval = interval * 1000; // seconds + _logger.LogInformation($" -> restarting in {interval} seconds ({_retryCount})."); + _timer.Start(); + } + catch (Exception e) + { + _logger.LogError(e, $"Error scheduling restart for queue {_queueServiceParams.QueueName}"); + // Reset the flag so a future attempt can try again + Interlocked.Exchange(ref _restarting, 0); + } } private void TimerActivation() { + // Reset the restart flag so future errors can schedule a new restart. + Interlocked.Exchange(ref _restarting, 0); + switch (_queueServiceParams.OnErrorAction) { case QueueConfiguration.ErrorAction.NackOnException: { - foreach (var message in _toBeNackedMessages) + try { - if (Channel != null && !Channel.IsClosed) + foreach (var message in _toBeNackedMessages) { - Channel.BasicNack(message, false, true); + if (Channel != null && !Channel.IsClosed) + { + Channel.BasicNack(message, false, true); + } } - + } + catch (Exception e) + { + _logger.LogWarning(e, $"Failed to nack queued messages on {_queueServiceParams.QueueName}, scheduling restart"); + _toBeNackedMessages.Clear(); + RestartIn(RetryInterval); + return; } _toBeNackedMessages.Clear(); return; @@ -231,7 +301,16 @@ private void TimerActivation() public void Stop() { + _stopping = true; + Interlocked.Exchange(ref _restarting, 0); + _timer.Stop(); Close(); } + + protected override void Cleanup() + { + _timer?.Stop(); + _timer?.Dispose(); + } } }