diff --git a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift index f457582..e9eade6 100644 --- a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift +++ b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/BidirectionalStreamingURLSessionDelegate.swift @@ -111,7 +111,15 @@ final class BidirectionalStreamingURLSessionDelegate: NSObject, URLSessionTaskDe let (inputStream, outputStream) = createStreamPair(withBufferSize: requestStreamBufferSize) // Bridge the output stream to the request body (which opens the output stream). - requestStream = HTTPBodyOutputStreamBridge(outputStream, requestBody!) + requestStream = HTTPBodyOutputStreamBridge(outputStream, requestBody!) { [weak self, weak task] error in + guard let self else { + return + } + responseBodyStreamSource.finish(throwing: error) + responseContinuation?.resume(throwing: error) + responseContinuation = nil + task?.cancel() + } // Return the new input stream (unopened, it gets opened by URLSession). return inputStream diff --git a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift index b416296..922b7da 100644 --- a/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift +++ b/Sources/OpenAPIURLSession/URLSessionBidirectionalStreaming/HTTPBodyOutputStreamBridge.swift @@ -21,14 +21,16 @@ final class HTTPBodyOutputStreamBridge: NSObject, StreamDelegate { let httpBody: HTTPBody let outputStream: OutputStream + let producerErrorHandler: (any Error) -> Void private(set) var state: State { didSet { debug("Output stream delegate state transition: \(oldValue) -> \(state)") } } /// Creates a new `HTTPBodyOutputStreamBridge` and opens the output stream. - init(_ outputStream: OutputStream, _ httpBody: HTTPBody) { + init(_ outputStream: OutputStream, _ httpBody: HTTPBody, producerErrorHandler: @escaping (any Error) -> Void) { self.httpBody = httpBody self.outputStream = outputStream + self.producerErrorHandler = producerErrorHandler self.state = .initial super.init() self.outputStream.delegate = self @@ -62,13 +64,22 @@ final class HTTPBodyOutputStreamBridge: NSObject, StreamDelegate { dispatchPrecondition(condition: .onQueue(Self.streamQueue)) let task = Task { dispatchPrecondition(condition: .notOnQueue(Self.streamQueue)) - for try await chunk in httpBody { - try await withCheckedThrowingContinuation { continuation in - Self.streamQueue.async { - debug("Output stream delegate produced chunk and suspended producer.") - self.performAction(self.state.producedChunkAndSuspendedProducer(chunk, continuation)) + do { + for try await chunk in httpBody { + do { + try await withCheckedThrowingContinuation { continuation in + Self.streamQueue.async { + debug("Output stream delegate produced chunk and suspended producer.") + self.performAction(self.state.producedChunkAndSuspendedProducer(chunk, continuation)) + } + } + } catch { + // Ignore errors when sending the request body + break } } + } catch { + self.producerErrorHandler(error) } Self.streamQueue.async { debug("Output stream delegate wrote final chunk.") @@ -124,7 +135,7 @@ final class HTTPBodyOutputStreamBridge: NSObject, StreamDelegate { extension HTTPBodyOutputStreamBridge { typealias Chunk = ArraySlice - typealias ProducerTask = Task + typealias ProducerTask = Task typealias ProducerContinuation = CheckedContinuation enum State {