Skip to content

Rework MQTT Connection Logic#2

Open
yulivee wants to merge 5 commits intomasterfrom
feature/mqtt-rework-issues
Open

Rework MQTT Connection Logic#2
yulivee wants to merge 5 commits intomasterfrom
feature/mqtt-rework-issues

Conversation

@yulivee
Copy link
Contributor

@yulivee yulivee commented Feb 8, 2026

  • Replace hand-rolled exponential backoff reconnection (ScheduleReconnectionAsync + TrackBackgroundTask chaining)
    with a Polly ResiliencePipeline, eliminating unbounded task chain growth when the broker is unreachable
  • Add connection loss detection: PublishAsync and SubscribeAsync now catch MqttClientNotConnectedException, update
    connection state, and automatically trigger the reconnection loop
  • Add bounded command queue (CommandQueueDepth, default 20) that discards and logs commands when full, preventing
    unbounded memory growth while disconnected
  • Make MaxReconnectionAttempts configurable (default 20) instead of retrying indefinitely

@greptile-apps
Copy link

greptile-apps bot commented Feb 8, 2026

Greptile Overview

Greptile Summary

This PR successfully replaces hand-rolled exponential backoff reconnection logic with a Polly ResiliencePipeline, addressing unbounded task chain growth when the MQTT broker is unreachable.

Key improvements:

  • Polly-based retry strategy with configurable MaxReconnectionAttempts (default 20) prevents indefinite reconnection attempts
  • Connection loss detection via MqttClientNotConnectedException in PublishAsync and SubscribeAsync automatically triggers reconnection
  • Bounded command queue (CommandQueueDepth, default 20) prevents unbounded memory growth during disconnection
  • Enhanced disposal with timeout-based cleanup of background tasks

Issues found:

  • Critical race condition in EnqueueCommand (line 359-369): the check-then-increment pattern is not atomic, allowing multiple threads to exceed the configured queue depth
  • Potential race condition in HandleConnectionLost (line 229-244): modifies _isConnected without holding _connectionLock, which could cause inconsistent state during concurrent connection attempts

Minor improvements:

  • Added ArgumentNullException.ThrowIfNull validation across MQTT services
  • Refactored DimmerCommandPublisher to extract PublishPowerCommandAsync helper
  • Improved disposal of Rx subjects with OnCompleted() calls
  • Added double-dispose protection in AlarmStateMachine

Confidence Score: 3/5

  • This PR has solid architectural improvements but contains two race conditions that could cause issues under concurrent load
  • The Polly integration follows project conventions and solves the unbounded task chain problem. However, the race condition in EnqueueCommand could allow the queue to exceed its configured depth under concurrent access, and the unsynchronized _isConnected modification in HandleConnectionLost could lead to inconsistent connection state. These threading issues need to be resolved before merging.
  • Pay close attention to src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs lines 229-244 and 359-369 for race condition fixes

greptile-apps[bot]

This comment was marked as outdated.

@greptile-apps
Copy link

greptile-apps bot commented Feb 8, 2026

Greptile Overview

Greptile Summary

Replaced hand-rolled exponential backoff reconnection with Polly ResiliencePipeline, eliminating unbounded task chain growth from the previous approach. Added connection loss detection via MqttClientNotConnectedException catching in PublishAsync/SubscribeAsync, and implemented bounded command queue with configurable depth to prevent unbounded memory growth while disconnected.

Confidence Score: 3/5

  • This PR has important improvements but contains critical concurrency bugs that need resolution before merge
  • The Polly integration is clean and eliminates unbounded task chains as intended. However, the deadlock risk from awaiting _queueDrainTask while holding _connectionLock (line 155-156) is a blocking issue, and the race condition in EnqueueCommand could cause count mismatches. The unsynchronized _isConnected check in the drain loop is minor but should be fixed. Test coverage with ErrorFailingLogger is excellent.
  • Pay close attention to src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs — the deadlock and race conditions need resolution

greptile-apps[bot]

This comment was marked as outdated.

@greptile-apps
Copy link

greptile-apps bot commented Feb 8, 2026

Greptile Overview

Greptile Summary

Replaces hand-rolled exponential backoff reconnection with Polly ResiliencePipeline, eliminating unbounded task chain growth. Adds connection loss detection in PublishAsync and SubscribeAsync that catches MqttClientNotConnectedException, updates state, and triggers reconnection. Implements bounded command queue (default 20 items) that discards commands when full to prevent unbounded memory growth during disconnection. Makes MaxReconnectionAttempts configurable (default 20) instead of retrying indefinitely.

Key improvements:

  • Polly-based resilience replaces custom reconnection chaining (aligns with CLAUDE.md convention)
  • Bounded queue prevents memory leaks during extended disconnection
  • Automatic connection loss detection in publish/subscribe paths

Critical issue found:

  • Infinite recursion risk in ProcessQueuedCommandsAsyncPublishAsyncEnqueueCommand loop when publish repeatedly fails with non-connection errors (e.g., timeouts)

Confidence Score: 2/5

  • Critical infinite recursion risk requires resolution before merge
  • Found critical infinite recursion vulnerability in ProcessQueuedCommandsAsync that can cause unbounded retries when publish fails with timeouts or other non-connection errors. Multiple race conditions noted in previous review threads remain unaddressed. The Polly integration and bounded queue are good improvements, but the drain loop logic has fundamental flaws
  • src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs requires attention - specifically the queue drain loop and command retry logic

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 file reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

private async Task ProcessQueuedCommandsAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested && _commandQueue.TryDequeue(out var command))
while (!ct.IsCancellationRequested && Volatile.Read(ref _isConnected) && _commandQueue.TryDequeue(out var command))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Volatile.Read(ref _isConnected) checks volatile boolean, but _isConnected is not marked as volatile, making this read behave like a normal read

Suggested change
while (!ct.IsCancellationRequested && Volatile.Read(ref _isConnected) && _commandQueue.TryDequeue(out var command))
while (!ct.IsCancellationRequested && _isConnected && _commandQueue.TryDequeue(out var command))
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 395:395

Comment:
`Volatile.Read(ref _isConnected)` checks volatile boolean, but `_isConnected` is not marked as `volatile`, making this read behave like a normal read

```suggestion
        while (!ct.IsCancellationRequested && _isConnected && _commandQueue.TryDequeue(out var command))
```

How can I resolve this? If you propose a fix, please make it concise.

{
_logger.LogInformation("Processing queued command: {Topic} = {Payload}",
command.Topic, command.Payload);
await PublishAsync(command.Topic, command.Payload, ct);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Infinite recursion risk: ProcessQueuedCommandsAsync calls PublishAsync, which on failure calls EnqueueCommand to re-queue the item and may continue the drain loop without exiting. If PublishAsync fails repeatedly (e.g., timeout), the same command gets re-queued and re-processed indefinitely

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 402:402

Comment:
Infinite recursion risk: `ProcessQueuedCommandsAsync` calls `PublishAsync`, which on failure calls `EnqueueCommand` to re-queue the item and may continue the drain loop without exiting. If `PublishAsync` fails repeatedly (e.g., timeout), the same command gets re-queued and re-processed indefinitely

How can I resolve this? If you propose a fix, please make it concise.

_logger.LogError(ex, "Error processing queued command");
// Re-queue on error
_commandQueue.Enqueue(command);
EnqueueCommand(command.Topic, command.Payload);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-queuing on generic exceptions could cause infinite retry loop for commands that will always fail (e.g., malformed payload). Consider logging and discarding instead of re-queuing, or add retry limit per command

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 417:417

Comment:
Re-queuing on generic exceptions could cause infinite retry loop for commands that will always fail (e.g., malformed payload). Consider logging and discarding instead of re-queuing, or add retry limit per command

How can I resolve this? If you propose a fix, please make it concise.

result.ResultCode);
_connectionStateSubject.OnNext(connectionState);
// Process any queued commands (fire-and-forget; tracked via _queueDrainTask for disposal)
_queueDrainTask = ProcessQueuedCommandsAsync(_disposalCts.Token);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fire-and-forget assignment creates cleanup uncertainty. _queueDrainTask is assigned a running task without awaiting, and later disposal logic depends on this field being set. If ConnectCoreAsync is called multiple times or disposal happens before assignment completes, tracking could fail

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 155:155

Comment:
Fire-and-forget assignment creates cleanup uncertainty. `_queueDrainTask` is assigned a running task without awaiting, and later disposal logic depends on this field being set. If `ConnectCoreAsync` is called multiple times or disposal happens before assignment completes, tracking could fail

How can I resolve this? If you propose a fix, please make it concise.

try
// Check capacity before enqueuing (slight over-admit is acceptable
// under contention; the queue depth is a soft limit)
if (Volatile.Read(ref _queuedCommandCount) >= _options.CommandQueueDepth)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Volatile.Read used for _queuedCommandCount but Interlocked operations on same field. Since Interlocked already provides memory barriers, Volatile.Read is unnecessary here

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 378:378

Comment:
`Volatile.Read` used for `_queuedCommandCount` but `Interlocked` operations on same field. Since `Interlocked` already provides memory barriers, `Volatile.Read` is unnecessary here

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant