diff --git a/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/Protocols/DataMessageObserver.cs b/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/Protocols/DataMessageObserver.cs index cc01b6705a4..0fa1de1d29d 100644 --- a/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/Protocols/DataMessageObserver.cs +++ b/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/Protocols/DataMessageObserver.cs @@ -1,54 +1,38 @@ -using System.Collections.Concurrent; +using System.Threading.Channels; using HotChocolate.Utilities; namespace HotChocolate.Transport.Sockets.Client.Protocols; internal sealed class DataMessageObserver(string id) : IObserver, IDisposable { - private readonly SemaphoreSlim _semaphore = new(0); - private readonly ConcurrentQueue _messages = new(); - private Exception? _error; - private bool _disposed; + private readonly Channel _channel = Channel.CreateUnbounded(); public async ValueTask TryReadNextAsync(CancellationToken ct) { - ObjectDisposedException.ThrowIf(_disposed, this); - - await _semaphore.WaitAsync(ct); - - if (_error is not null) + try { - throw _error; + return await _channel.Reader.ReadAsync(ct); + } + catch (ChannelClosedException) + { + return null; } - - _messages.TryDequeue(out var message); - return message; } public void OnNext(IOperationMessage value) { if (value is IDataMessage message && message.Id.EqualsOrdinal(id)) { - _messages.Enqueue(message); - _semaphore.Release(); + _channel.Writer.TryWrite(message); } } public void OnError(Exception error) - { - _error = error; - _semaphore.Release(); - } + => _channel.Writer.TryComplete(error); public void OnCompleted() - => _semaphore.Release(); + => _channel.Writer.TryComplete(); public void Dispose() - { - if (!_disposed) - { - _semaphore.Dispose(); - _disposed = true; - } - } + => _channel.Writer.TryComplete(); } diff --git a/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketClient.cs b/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketClient.cs index f13679f9711..4db2bb1095e 100644 --- a/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketClient.cs +++ b/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketClient.cs @@ -140,9 +140,8 @@ async Task ISocket.ReadMessageAsync( return read > 0; } - catch + catch (Exception ex) when (ex is not OperationCanceledException) { - // swallow exception, there's nothing we can reasonably do. return false; } } diff --git a/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketResult.cs b/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketResult.cs index 0dc5f38c3b9..fc47f8c081c 100644 --- a/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketResult.cs +++ b/src/HotChocolate/AspNetCore/src/Transport.Sockets.Client/SocketResult.cs @@ -62,30 +62,35 @@ public async IAsyncEnumerator GetAsyncEnumerator( IDataMessage? message; - do + try { - message = await observer.TryReadNextAsync(cancellationToken); - - switch (message) + do { - case NextMessage next: - yield return next.Payload; - break; + message = await observer.TryReadNextAsync(cancellationToken); - case ErrorMessage error: - yield return error.Payload; - message = null; - completion.MarkDataStreamCompleted(); - break; + switch (message) + { + case NextMessage next: + yield return next.Payload; + break; - case CompleteMessage: - message = null; - completion.MarkDataStreamCompleted(); - break; - } - } while (!cancellationToken.IsCancellationRequested && message is not null); + case ErrorMessage error: + yield return error.Payload; + message = null; + completion.MarkDataStreamCompleted(); + break; - completion.TrySendCompleteMessage(); + case CompleteMessage: + message = null; + completion.MarkDataStreamCompleted(); + break; + } + } while (!cancellationToken.IsCancellationRequested && message is not null); + } + finally + { + completion.TrySendCompleteMessage(); + } } public void Dispose()