Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,54 +1,38 @@
using System.Collections.Concurrent;
using System.Threading.Channels;
using HotChocolate.Utilities;

namespace HotChocolate.Transport.Sockets.Client.Protocols;

internal sealed class DataMessageObserver(string id) : IObserver<IOperationMessage>, IDisposable
{
private readonly SemaphoreSlim _semaphore = new(0);
private readonly ConcurrentQueue<IDataMessage> _messages = new();
private Exception? _error;
private bool _disposed;
private readonly Channel<IDataMessage> _channel = Channel.CreateUnbounded<IDataMessage>();

public async ValueTask<IDataMessage?> TryReadNextAsync(CancellationToken ct)
{
ObjectDisposedException.ThrowIf(_disposed, this);

await _semaphore.WaitAsync(ct);

if (_error is not null)
try
{
throw _error;
return await _channel.Reader.ReadAsync(ct);
}
catch (ChannelClosedException)
{
return null;
}

_messages.TryDequeue(out var message);
return message;
}

public void OnNext(IOperationMessage value)
{
if (value is IDataMessage message && message.Id.EqualsOrdinal(id))
{
_messages.Enqueue(message);
_semaphore.Release();
_channel.Writer.TryWrite(message);
}
}

public void OnError(Exception error)
{
_error = error;
_semaphore.Release();
}
=> _channel.Writer.TryComplete(error);

public void OnCompleted()
=> _semaphore.Release();
=> _channel.Writer.TryComplete();

public void Dispose()
{
if (!_disposed)
{
_semaphore.Dispose();
_disposed = true;
}
}
=> _channel.Writer.TryComplete();
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ async Task<bool> ISocket.ReadMessageAsync(

return read > 0;
}
catch
catch (Exception ex) when (ex is not OperationCanceledException)
{
// swallow exception, there's nothing we can reasonably do.
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,35 @@ public async IAsyncEnumerator<OperationResult> GetAsyncEnumerator(

IDataMessage? message;

do
try
{
message = await observer.TryReadNextAsync(cancellationToken);

switch (message)
do
{
case NextMessage next:
yield return next.Payload;
break;
message = await observer.TryReadNextAsync(cancellationToken);

case ErrorMessage error:
yield return error.Payload;
message = null;
completion.MarkDataStreamCompleted();
break;
switch (message)
{
case NextMessage next:
yield return next.Payload;
break;

case CompleteMessage:
message = null;
completion.MarkDataStreamCompleted();
break;
}
} while (!cancellationToken.IsCancellationRequested && message is not null);
case ErrorMessage error:
yield return error.Payload;
message = null;
completion.MarkDataStreamCompleted();
break;

completion.TrySendCompleteMessage();
case CompleteMessage:
message = null;
completion.MarkDataStreamCompleted();
break;
}
} while (!cancellationToken.IsCancellationRequested && message is not null);
}
finally
{
completion.TrySendCompleteMessage();
}
}

public void Dispose()
Expand Down
Loading