Conversation
Greptile OverviewGreptile SummaryThis PR successfully replaces hand-rolled exponential backoff reconnection logic with a Polly Key improvements:
Issues found:
Minor improvements:
Confidence Score: 3/5
|
Greptile OverviewGreptile SummaryReplaced hand-rolled exponential backoff reconnection with Polly Confidence Score: 3/5
|
Greptile OverviewGreptile SummaryReplaces hand-rolled exponential backoff reconnection with Polly Key improvements:
Critical issue found:
Confidence Score: 2/5
|
| private async Task ProcessQueuedCommandsAsync(CancellationToken ct) | ||
| { | ||
| while (!ct.IsCancellationRequested && _commandQueue.TryDequeue(out var command)) | ||
| while (!ct.IsCancellationRequested && Volatile.Read(ref _isConnected) && _commandQueue.TryDequeue(out var command)) |
There was a problem hiding this comment.
Volatile.Read(ref _isConnected) checks volatile boolean, but _isConnected is not marked as volatile, making this read behave like a normal read
| while (!ct.IsCancellationRequested && Volatile.Read(ref _isConnected) && _commandQueue.TryDequeue(out var command)) | |
| while (!ct.IsCancellationRequested && _isConnected && _commandQueue.TryDequeue(out var command)) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 395:395
Comment:
`Volatile.Read(ref _isConnected)` checks volatile boolean, but `_isConnected` is not marked as `volatile`, making this read behave like a normal read
```suggestion
while (!ct.IsCancellationRequested && _isConnected && _commandQueue.TryDequeue(out var command))
```
How can I resolve this? If you propose a fix, please make it concise.| { | ||
| _logger.LogInformation("Processing queued command: {Topic} = {Payload}", | ||
| command.Topic, command.Payload); | ||
| await PublishAsync(command.Topic, command.Payload, ct); |
There was a problem hiding this comment.
Infinite recursion risk: ProcessQueuedCommandsAsync calls PublishAsync, which on failure calls EnqueueCommand to re-queue the item and may continue the drain loop without exiting. If PublishAsync fails repeatedly (e.g., timeout), the same command gets re-queued and re-processed indefinitely
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 402:402
Comment:
Infinite recursion risk: `ProcessQueuedCommandsAsync` calls `PublishAsync`, which on failure calls `EnqueueCommand` to re-queue the item and may continue the drain loop without exiting. If `PublishAsync` fails repeatedly (e.g., timeout), the same command gets re-queued and re-processed indefinitely
How can I resolve this? If you propose a fix, please make it concise.| _logger.LogError(ex, "Error processing queued command"); | ||
| // Re-queue on error | ||
| _commandQueue.Enqueue(command); | ||
| EnqueueCommand(command.Topic, command.Payload); |
There was a problem hiding this comment.
Re-queuing on generic exceptions could cause infinite retry loop for commands that will always fail (e.g., malformed payload). Consider logging and discarding instead of re-queuing, or add retry limit per command
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 417:417
Comment:
Re-queuing on generic exceptions could cause infinite retry loop for commands that will always fail (e.g., malformed payload). Consider logging and discarding instead of re-queuing, or add retry limit per command
How can I resolve this? If you propose a fix, please make it concise.| result.ResultCode); | ||
| _connectionStateSubject.OnNext(connectionState); | ||
| // Process any queued commands (fire-and-forget; tracked via _queueDrainTask for disposal) | ||
| _queueDrainTask = ProcessQueuedCommandsAsync(_disposalCts.Token); |
There was a problem hiding this comment.
Fire-and-forget assignment creates cleanup uncertainty. _queueDrainTask is assigned a running task without awaiting, and later disposal logic depends on this field being set. If ConnectCoreAsync is called multiple times or disposal happens before assignment completes, tracking could fail
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 155:155
Comment:
Fire-and-forget assignment creates cleanup uncertainty. `_queueDrainTask` is assigned a running task without awaiting, and later disposal logic depends on this field being set. If `ConnectCoreAsync` is called multiple times or disposal happens before assignment completes, tracking could fail
How can I resolve this? If you propose a fix, please make it concise.| try | ||
| // Check capacity before enqueuing (slight over-admit is acceptable | ||
| // under contention; the queue depth is a soft limit) | ||
| if (Volatile.Read(ref _queuedCommandCount) >= _options.CommandQueueDepth) |
There was a problem hiding this comment.
Volatile.Read used for _queuedCommandCount but Interlocked operations on same field. Since Interlocked already provides memory barriers, Volatile.Read is unnecessary here
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/LumiRise.Api/Services/Mqtt/Implementation/MqttConnectionManager.cs
Line: 378:378
Comment:
`Volatile.Read` used for `_queuedCommandCount` but `Interlocked` operations on same field. Since `Interlocked` already provides memory barriers, `Volatile.Read` is unnecessary here
How can I resolve this? If you propose a fix, please make it concise.
with a Polly ResiliencePipeline, eliminating unbounded task chain growth when the broker is unreachable
connection state, and automatically trigger the reconnection loop
unbounded memory growth while disconnected