Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 83 additions & 5 deletions SimpleRabbit.NetCore/Service/BasicRabbitService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
using System.Linq;
using System.Security.Authentication;
using System.Threading;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace SimpleRabbit.NetCore
{
Expand Down Expand Up @@ -58,26 +60,87 @@ protected ConnectionFactory Factory
}
}
private string ClientName =>
_config?.Name ??
_config?.Name ??
Environment.GetEnvironmentVariable("COMPUTERNAME") ??
Environment.GetEnvironmentVariable("HOSTNAME");

private readonly RabbitConfiguration _config;
private readonly ILogger _baseLogger;

protected BasicRabbitService(RabbitConfiguration config)
{
_config = config;
}

protected BasicRabbitService(RabbitConfiguration config, ILogger logger) : this(config)
{
_baseLogger = logger;
}

private IConnection _connection;
/// <summary>
/// ClientName is used only for human reference from RabbitMQ UI.
/// </summary>
protected IConnection Connection => _connection ?? (_connection = Factory.CreateConnection(_hostnames, ClientName));
protected IConnection Connection
{
get
{
if (_connection == null)
{
_connection = Factory.CreateConnection(_hostnames, ClientName);

if (_connection is IAutorecoveringConnection autorecovering)
{
autorecovering.RecoverySucceeded += OnConnectionRecoverySucceeded;
autorecovering.ConnectionRecoveryError += OnConnectionRecoveryError;
}

_connection.ConnectionShutdown += OnConnectionShutdown;
}

return _connection;
}
}

private IModel _channel;
protected IModel Channel => _channel ?? (_channel = Connection.CreateModel());

private void OnConnectionRecoverySucceeded(object sender, EventArgs e)
{
_baseLogger?.LogInformation("RabbitMQ connection recovered successfully, invalidating channel");
lock (_lock)
{
// Invalidate the stale channel so it gets recreated on next access.
// The connection itself is still valid (it just recovered).
try
{
_channel?.Dispose();
}
catch
{
// Channel may already be disposed after recovery
}
_channel = null;
}
OnRecovered();
}

private void OnConnectionRecoveryError(object sender, ConnectionRecoveryErrorEventArgs e)
{
_baseLogger?.LogError(e.Exception, "RabbitMQ connection recovery failed");
}

private void OnConnectionShutdown(object sender, ShutdownEventArgs e)
{
_baseLogger?.LogWarning("RabbitMQ connection shutdown: {Reason}", e.ReplyText);
}

/// <summary>
/// Called when the connection has been automatically recovered.
/// Override to re-register consumers or perform other post-recovery actions.
/// </summary>
protected virtual void OnRecovered() { }

public IBasicProperties GetBasicProperties()
{
lock (_lock)
Expand All @@ -102,9 +165,24 @@ public void ClearConnection()
}
finally
{
_connection?.Close();
_connection?.Dispose();
_connection = null;
try
{
if (_connection != null)
{
if (_connection is IAutorecoveringConnection autorecovering)
{
autorecovering.RecoverySucceeded -= OnConnectionRecoverySucceeded;
autorecovering.ConnectionRecoveryError -= OnConnectionRecoveryError;
}
_connection.ConnectionShutdown -= OnConnectionShutdown;
}
}
finally
{
_connection?.Close();
_connection?.Dispose();
_connection = null;
}
}

}
Expand Down
133 changes: 106 additions & 27 deletions SimpleRabbit.NetCore/Service/QueueService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace SimpleRabbit.NetCore
Expand All @@ -21,17 +22,20 @@ public class QueueService : BasicRabbitService, IQueueService
private const int DefaultRetryInterval = 15;

private readonly ILogger<QueueService> _logger;
private readonly object _restartLock = new object();

private readonly Timer _timer;
private QueueConfiguration _queueServiceParams;
private IMessageHandler _handler;
private TimeSpan RetryInterval => TimeSpan.FromSeconds(_queueServiceParams?.RetryIntervalInSeconds ?? DefaultRetryInterval);

private int _retryCount;
private int _restarting;
private volatile bool _stopping;

private ConcurrentBag<ulong> _toBeNackedMessages = new ConcurrentBag<ulong>();

public QueueService(RabbitConfiguration options, ILogger<QueueService> logger) : base(options)
public QueueService(RabbitConfiguration options, ILogger<QueueService> logger) : base(options, logger)
{
_logger = logger;

Expand Down Expand Up @@ -71,6 +75,7 @@ public void Start(QueueConfiguration subscriberConfiguration, IMessageHandler ha
private void Start()
{
Stop();
_stopping = false;
if (_handler == null)
{
throw new ArgumentNullException(nameof(_handler), $"No handler provided for {_queueServiceParams.ConsumerTag} => {_queueServiceParams.QueueName}");
Expand All @@ -80,9 +85,13 @@ private void Start()
{
var consumer = new AsyncEventingBasicConsumer(Channel);
consumer.Received += ReceiveEventAsync;
consumer.ConsumerCancelled += OnConsumerCancelled;
consumer.Shutdown += OnConsumerShutdown;

Channel.BasicQos(0, _queueServiceParams.PrefetchCount ?? 1, false);
Channel.BasicConsume(_queueServiceParams.QueueName, false, _queueServiceParams.DisplayName ?? _queueServiceParams.ConsumerTag, consumer);

_logger.LogInformation($"Consumer started on queue {_queueServiceParams.QueueName}");
}
catch (Exception e)
{
Expand All @@ -91,6 +100,36 @@ private void Start()
}
}

private Task OnConsumerCancelled(object sender, ConsumerEventArgs e)
{
if (_stopping) return Task.CompletedTask;
_logger.LogWarning($"Consumer cancelled by broker on queue {_queueServiceParams.QueueName}, tags: {string.Join(", ", e.ConsumerTags)}. Scheduling restart.");
RestartIn(RetryInterval);
return Task.CompletedTask;
}

private Task OnConsumerShutdown(object sender, ShutdownEventArgs e)
{
if (_stopping) return Task.CompletedTask;
_logger.LogWarning($"Consumer shutdown on queue {_queueServiceParams.QueueName}: {e.ReplyText}. Scheduling restart.");
RestartIn(RetryInterval);
return Task.CompletedTask;
}

/// <summary>
/// Called by BasicRabbitService when the connection has been automatically recovered.
/// Re-registers the consumer since the old channel/consumer is stale after recovery.
/// </summary>
protected override void OnRecovered()
{
if (_stopping) return;
_logger.LogInformation($"Connection recovered, restarting consumer on queue {_queueServiceParams?.QueueName}");
if (_queueServiceParams != null && _handler != null)
{
RestartIn(TimeSpan.FromSeconds(1));
}
}

private async Task ReceiveEventAsync(object sender, BasicDeliverEventArgs args)
{
var channel = (sender as AsyncEventingBasicConsumer)?.Model;
Expand Down Expand Up @@ -158,7 +197,17 @@ private void OnError(object sender, BasicDeliverEventArgs message)
default:
{
RestartIn(RetryInterval);
channel.BasicNack(message.DeliveryTag, false, true);
try
{
if (!channel.IsClosed)
{
channel.BasicNack(message.DeliveryTag, false, true);
}
}
catch (Exception nackEx)
{
_logger.LogWarning(nackEx, $"Failed to nack message {message.DeliveryTag} on queue {_queueServiceParams.QueueName} (channel may be closed)");
}
return;
}
}
Expand All @@ -172,50 +221,71 @@ private void OnError(object sender, BasicDeliverEventArgs message)

private void RestartIn(TimeSpan waitInterval)
{
if (_timer.Enabled)
// Use atomic compare-exchange to ensure only one thread enters the restart path.
if (Interlocked.CompareExchange(ref _restarting, 1, 0) != 0)
{
// another message has already triggered an error.
// Another thread is already handling the restart.
return;

}

if (_queueServiceParams.OnErrorAction == QueueConfiguration.ErrorAction.RestartConnection)
try
{
try
{
//take note of blocking if clearing connection here
//https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/341
// attempt to stop the event consumption.
if (Channel != null && !Channel.IsClosed)
Channel?.BasicCancel(_queueServiceParams.ConsumerTag);
}
catch
if (_queueServiceParams.OnErrorAction == QueueConfiguration.ErrorAction.RestartConnection)
{
// ignored
try
{
//take note of blocking if clearing connection here
//https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/341
// attempt to stop the event consumption.
if (Channel != null && !Channel.IsClosed)
Channel?.BasicCancel(_queueServiceParams.DisplayName ?? _queueServiceParams.ConsumerTag);
}
catch
{
// ignored
}
}
}

_retryCount++;
var interval = waitInterval.TotalSeconds * (_queueServiceParams.AutoBackOff ? _retryCount : 1) % MaxRetryInterval;
_retryCount++;
var interval = waitInterval.TotalSeconds * (_queueServiceParams.AutoBackOff ? _retryCount : 1) % MaxRetryInterval;

_timer.Interval = interval * 1000; // seconds
_logger.LogInformation($" -> restarting in {interval} seconds ({_retryCount}).");
_timer.Start();
_timer.Interval = interval * 1000; // seconds
_logger.LogInformation($" -> restarting in {interval} seconds ({_retryCount}).");
_timer.Start();
}
catch (Exception e)
{
_logger.LogError(e, $"Error scheduling restart for queue {_queueServiceParams.QueueName}");
// Reset the flag so a future attempt can try again
Interlocked.Exchange(ref _restarting, 0);
}
}

private void TimerActivation()
{
// Reset the restart flag so future errors can schedule a new restart.
Interlocked.Exchange(ref _restarting, 0);

switch (_queueServiceParams.OnErrorAction)
{
case QueueConfiguration.ErrorAction.NackOnException:
{
foreach (var message in _toBeNackedMessages)
try
{
if (Channel != null && !Channel.IsClosed)
foreach (var message in _toBeNackedMessages)
{
Channel.BasicNack(message, false, true);
if (Channel != null && !Channel.IsClosed)
{
Channel.BasicNack(message, false, true);
}
}

}
catch (Exception e)
{
_logger.LogWarning(e, $"Failed to nack queued messages on {_queueServiceParams.QueueName}, scheduling restart");
_toBeNackedMessages.Clear();
RestartIn(RetryInterval);
return;
}
_toBeNackedMessages.Clear();
return;
Expand All @@ -231,7 +301,16 @@ private void TimerActivation()

public void Stop()
{
_stopping = true;
Interlocked.Exchange(ref _restarting, 0);
_timer.Stop();
Close();
}

protected override void Cleanup()
{
_timer?.Stop();
_timer?.Dispose();
}
}
}
Loading