From 5c96c26f111c54c9f0addbc60d6f39158cf833f2 Mon Sep 17 00:00:00 2001 From: Jake Petroules Date: Mon, 27 Apr 2026 22:40:00 -0700 Subject: [PATCH 1/2] Fix streaming read hangs by replacing persistent DispatchIO with per-read dispatch_read MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Darwin AsyncIO implementation used a single persistent DispatchIO channel with read(length: .max), which only completes at EOF. This caused AsyncBufferSequence to hang whenever a subprocess produced less output than the buffer size, because the read would never return until the pipe closed. The preferredBufferSize parameter existed solely as a workaround — setting it to 1 made reads return byte-by-byte — but the real fix is to make reads return available data without waiting for a full buffer. Root cause: DispatchIO.read(length: .max) accumulates data internally and only resumes the continuation when done=true (EOF). The low water mark controls handler invocation granularity but not when the operation completes. This is fundamentally incompatible with streaming reads. The fix replaces the persistent DispatchIO channel with the class-level dispatch_read/dispatch_write convenience APIs, which create a temporary channel per operation and return whatever data is currently available: Darwin (AsyncIO+Dispatch.swift): - read() calls DispatchIO.read(fromFileDescriptor:maxLength:) which returns one chunk of available data per call (streaming semantics) - read(upTo: .max) loops internally until EOF (bulk semantics) - New readAll(from:upTo:) loops until maxLength bytes or EOF, used by captureOutput for collecting complete subprocess output - write() uses DispatchIO.write(toFileDescriptor:) with a retry loop for any unwritten data, matching Linux/Windows behavior - IOChannel on Darwin now stores a FileDescriptor instead of DispatchIO, removing the persistent channel entirely Linux (AsyncIO+Linux.swift) and Windows (AsyncIO+Windows.swift): - Refactored read into _read(from:upTo:returnOnFirstRead:) to share implementation between single-chunk and bulk modes - read() with finite maxLength returns after the first successful read syscall (single-chunk, for streaming) - read(upTo: .max) loops until EOF (bulk, same as before) - New readAll() loops until maxLength bytes or EOF This gives consistent cross-platform semantics: - read(upTo: finite) → single chunk (used by AsyncBufferSequence) - read(upTo: .max) → loop until EOF - readAll(upTo: N) → loop until N bytes or EOF (used by captureOutput) The preferredBufferSize parameter is now functionally unnecessary and can be removed in a subsequent change. Closes #252 --- Sources/Subprocess/AsyncBufferSequence.swift | 8 +- Sources/Subprocess/Configuration.swift | 28 +--- Sources/Subprocess/IO/AsyncIO+Dispatch.swift | 134 ++++++++++++------- Sources/Subprocess/IO/AsyncIO+Linux.swift | 51 ++++++- Sources/Subprocess/IO/AsyncIO+Windows.swift | 50 ++++++- Sources/Subprocess/IO/Output.swift | 4 +- Tests/SubprocessTests/AsyncIOTests.swift | 6 +- Tests/SubprocessTests/IntegrationTests.swift | 92 ++++++++++++- Tests/SubprocessTests/UnixTests.swift | 3 +- 9 files changed, 277 insertions(+), 99 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 17429cfb..43b7339d 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -38,9 +38,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { /// The element type for the asynchronous sequence. public typealias Element = Buffer - #if SUBPROCESS_ASYNCIO_DISPATCH - internal typealias DiskIO = DispatchIO - #elseif canImport(WinSDK) + #if canImport(WinSDK) internal typealias DiskIO = HANDLE #else internal typealias DiskIO = FileDescriptor @@ -74,9 +72,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { ) guard let data else { // We finished reading. Close the file descriptor now - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.diskIO)) - #elseif canImport(WinSDK) + #if canImport(WinSDK) try _safelyClose(.handle(self.diskIO)) #else try _safelyClose(.fileDescriptor(self.diskIO)) diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 3edfe42e..708866a0 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -760,7 +760,6 @@ internal enum _CloseTarget { #else case fileDescriptor(FileDescriptor) #endif - case dispatchIO(DispatchIO) } internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) { @@ -822,8 +821,6 @@ internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) { ) } #endif - case .dispatchIO(let dispatchIO): - dispatchIO.close() } } @@ -879,24 +876,7 @@ internal struct IODescriptor: ~Copyable { consuming func createIOChannel() -> IOChannel { let shouldClose = self.closeWhenDone self.closeWhenDone = false - #if SUBPROCESS_ASYNCIO_DISPATCH - // Transferring out the ownership of fileDescriptor means we don't have go close here - let closeFd = self.descriptor - let dispatchIO: DispatchIO = DispatchIO( - type: .stream, - fileDescriptor: self.platformDescriptor(), - queue: .global(), - cleanupHandler: { @Sendable error in - // Close the file descriptor - if shouldClose { - try? closeFd.close() - } - } - ) - return IOChannel(dispatchIO, closeWhenDone: shouldClose) - #else return IOChannel(self.descriptor, closeWhenDone: shouldClose) - #endif } internal mutating func safelyClose() throws(SubprocessError) { @@ -934,9 +914,7 @@ internal struct IODescriptor: ~Copyable { } internal struct IOChannel: ~Copyable, @unchecked Sendable { - #if SUBPROCESS_ASYNCIO_DISPATCH - typealias Channel = DispatchIO - #elseif canImport(WinSDK) + #if canImport(WinSDK) typealias Channel = HANDLE #else typealias Channel = FileDescriptor @@ -959,9 +937,7 @@ internal struct IOChannel: ~Copyable, @unchecked Sendable { } closeWhenDone = false - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.channel)) - #elseif canImport(WinSDK) + #if canImport(WinSDK) try _safelyClose(.handle(self.channel)) #else try _safelyClose(.fileDescriptor(self.channel)) diff --git a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift index b8a438bd..f667490d 100644 --- a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift +++ b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift @@ -33,49 +33,81 @@ final class AsyncIO: Sendable { from diskIO: borrowing IOChannel, upTo maxLength: Int ) async throws(SubprocessError) -> DispatchData? { - return try await self.read( - from: diskIO.channel, - upTo: maxLength, + if maxLength == .max { + return try await self.readAll(from: diskIO, upTo: maxLength) + } + return try await self.readChunk( + fd: diskIO.channel.rawValue, + maxLength: maxLength, ) } internal func read( - from dispatchIO: DispatchIO, + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> DispatchData? { + if maxLength == .max { + return try await self.readAll( + fd: fileDescriptor.rawValue, + upTo: maxLength + ) + } + return try await self.readChunk( + fd: fileDescriptor.rawValue, + maxLength: maxLength, + ) + } + + internal func readAll( + from diskIO: borrowing IOChannel, + upTo maxLength: Int + ) async throws(SubprocessError) -> DispatchData? { + return try await self.readAll( + fd: diskIO.channel.rawValue, + upTo: maxLength + ) + } + + private func readAll( + fd: Int32, upTo maxLength: Int + ) async throws(SubprocessError) -> DispatchData? { + var accumulated: DispatchData = .empty + while accumulated.count < maxLength { + if Task.isCancelled { + throw SubprocessError.asyncIOFailed( + reason: "Cancelled", + underlyingError: nil + ) + } + let remaining = maxLength == .max ? .max : maxLength - accumulated.count + let chunk: DispatchData? = try await self.readChunk(fd: fd, maxLength: remaining) + guard let chunk else { break } + accumulated.append(chunk) + } + return accumulated.isEmpty ? nil : accumulated + } + + private func readChunk( + fd: Int32, + maxLength: Int ) async throws(SubprocessError) -> DispatchData? { // https://github.com/swiftlang/swift/issues/87810 return try await _castError { return try await withCheckedThrowingContinuation { continuation in - var buffer: DispatchData = .empty - dispatchIO.read( - offset: 0, - length: maxLength, - queue: DispatchQueue(label: "SubprocessReadQueue") - ) { done, data, error in + DispatchIO.read( + fromFileDescriptor: fd, + maxLength: maxLength, + runningHandlerOn: .global() + ) { data, error in if error != 0 { continuation.resume( - throwing: - SubprocessError - .failedToReadFromProcess( - withUnderlyingError: Errno(rawValue: error) - ) - ) + throwing: SubprocessError.failedToReadFromProcess( + withUnderlyingError: Errno(rawValue: error) + )) return } - if let data { - if buffer.isEmpty { - buffer = data - } else { - buffer.append(data) - } - } - if done { - if !buffer.isEmpty { - continuation.resume(returning: buffer) - } else { - continuation.resume(returning: nil) - } - } + continuation.resume(returning: data.isEmpty ? nil : data) } } } @@ -147,27 +179,31 @@ final class AsyncIO: Sendable { queue: DispatchQueue = .global(), completion: @escaping (Int, SubprocessError?) -> Void ) { - diskIO.channel.write( - offset: 0, - data: dispatchData, - queue: queue - ) { done, unwritten, error in - guard done else { - // Wait until we are done writing or encountered some error - return - } - - let unwrittenLength = unwritten?.count ?? 0 - let writtenLength = dispatchData.count - unwrittenLength - guard error != 0 else { - completion(writtenLength, nil) - return + let fd = diskIO.channel.rawValue + var totalWritten = 0 + func writeRemaining(_ data: DispatchData) { + DispatchIO.write( + toFileDescriptor: fd, + data: data, + runningHandlerOn: queue + ) { unwritten, error in + let unwrittenLength = unwritten?.count ?? 0 + totalWritten += data.count - unwrittenLength + if error != 0 { + completion( + totalWritten, + .failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error)) + ) + return + } + if let unwritten, !unwritten.isEmpty { + writeRemaining(unwritten) + } else { + completion(totalWritten, nil) + } } - completion( - writtenLength, - .failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error)) - ) } + writeRemaining(dispatchData) } } diff --git a/Sources/Subprocess/IO/AsyncIO+Linux.swift b/Sources/Subprocess/IO/AsyncIO+Linux.swift index 9baf1a0f..84322eb4 100644 --- a/Sources/Subprocess/IO/AsyncIO+Linux.swift +++ b/Sources/Subprocess/IO/AsyncIO+Linux.swift @@ -332,12 +332,55 @@ extension AsyncIO { from diskIO: borrowing IOChannel, upTo maxLength: Int ) async throws(SubprocessError) -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) + if maxLength == .max { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + return try await self._readChunk(from: diskIO.channel, upTo: maxLength) } func read( from fileDescriptor: FileDescriptor, upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + if maxLength == .max { + return try await self._readAll(from: fileDescriptor, upTo: maxLength) + } + return try await self._readChunk(from: fileDescriptor, upTo: maxLength) + } + + func readAll( + from diskIO: borrowing IOChannel, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + + private func _readChunk( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: fileDescriptor, + upTo: maxLength, + returnOnFirstRead: true + ) + } + + private func _readAll( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: fileDescriptor, + upTo: maxLength, + returnOnFirstRead: false + ) + } + + private func _read( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int, + returnOnFirstRead: Bool ) async throws(SubprocessError) -> [UInt8]? { guard maxLength > 0 else { return nil @@ -375,7 +418,11 @@ extension AsyncIO { if bytesRead > 0 { // Read some data readLength += bytesRead - if maxLength == .max { + if returnOnFirstRead { + try self.removeRegistration(for: fileDescriptor) + resultBuffer.removeLast(resultBuffer.count - readLength) + return resultBuffer + } else if maxLength == .max { // Grow resultBuffer if needed guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { continue diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index b3cfbce3..c8dff9fe 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -246,12 +246,55 @@ final class AsyncIO: @unchecked Sendable { from diskIO: borrowing IOChannel, upTo maxLength: Int ) async throws(SubprocessError) -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) + if maxLength == .max { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + return try await self._readChunk(from: diskIO.channel, upTo: maxLength) } func read( from handle: HANDLE, upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + if maxLength == .max { + return try await self._readAll(from: handle, upTo: maxLength) + } + return try await self._readChunk(from: handle, upTo: maxLength) + } + + func readAll( + from diskIO: borrowing IOChannel, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + + private func _readChunk( + from handle: HANDLE, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: handle, + upTo: maxLength, + returnOnFirstRead: true + ) + } + + private func _readAll( + from handle: HANDLE, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: handle, + upTo: maxLength, + returnOnFirstRead: false + ) + } + + private func _read( + from handle: HANDLE, + upTo maxLength: Int, + returnOnFirstRead: Bool ) async throws(SubprocessError) -> [UInt8]? { guard maxLength > 0 else { return nil @@ -333,7 +376,10 @@ final class AsyncIO: @unchecked Sendable { } else { // Read some data readLength += Int(truncatingIfNeeded: bytesRead) - if maxLength == .max { + if returnOnFirstRead { + resultBuffer.removeLast(resultBuffer.count - readLength) + return resultBuffer + } else if maxLength == .max { // Grow resultBuffer if needed guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { continue diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index b0a14495..4463e711 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -141,7 +141,7 @@ public struct BytesOutput: OutputProtocol, ErrorOutputProtocol { // more byte to determine whether output exceeds the limit maxLength += 1 } - result = try await AsyncIO.shared.read(from: diskIO, upTo: maxLength) + result = try await AsyncIO.shared.readAll(from: diskIO, upTo: maxLength) } catch { try diskIO.safelyClose() throw error @@ -349,7 +349,7 @@ extension OutputProtocol { // more byte to determine whether output exceeds the limit maxLength += 1 } - result = try await AsyncIO.shared.read(from: diskIO, upTo: maxLength) + result = try await AsyncIO.shared.readAll(from: diskIO, upTo: maxLength) } catch { try diskIO.safelyClose() throw error diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index 0626df12..a63acc7f 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -71,7 +71,7 @@ extension SubprocessAsyncIOTests { try await runReadWriteTest { readIO, readTestBed in for expectedChunk in chunks { let readData = try #require( - try await readIO.read(from: readTestBed.ioChannel, upTo: expectedChunk.count) + try await readIO.readAll(from: readTestBed.ioChannel, upTo: expectedChunk.count) ) #expect(Array(readData) == expectedChunk) } @@ -285,9 +285,7 @@ extension SubprocessAsyncIOTests { extension SubprocessAsyncIOTests.TestBed { consuming func finish() async throws { - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.ioChannel.channel)) - #elseif canImport(WinSDK) + #if canImport(WinSDK) try _safelyClose(.handle(self.ioChannel.channel)) #else try _safelyClose(.fileDescriptor(self.ioChannel.channel)) diff --git a/Tests/SubprocessTests/IntegrationTests.swift b/Tests/SubprocessTests/IntegrationTests.swift index 19b5b922..91f89336 100644 --- a/Tests/SubprocessTests/IntegrationTests.swift +++ b/Tests/SubprocessTests/IntegrationTests.swift @@ -1833,7 +1833,7 @@ extension SubprocessIntegrationTests { #expect(result.standardError?.trimmingNewLineAndQuotes() == "") } - @Test func testCustomStreamingBufferSize() async throws { + @Test func testStreamingSmallInputs() async throws { #if os(Windows) let setup = TestSetup( executable: .name("cmd.exe"), @@ -1863,13 +1863,12 @@ extension SubprocessIntegrationTests { _ = try await _run( setup, input: .none, - error: .discarded, - preferredBufferSize: 1 + error: .discarded ) { execution, standardOutput in for try await line in standardOutput.strings() { - // If we use default buffer size this test will hang - // because Subprocess is stuck on waiting 16k worth of - // output when there are only 3. + // This test should not hang when there are only + // 3 bytes of output even though Subprocess' + // internal buffer size is something like 16k. #expect(line.trimmingNewLineAndQuotes() == "one") // Kill the child process since it intentionally hang #if os(Windows) @@ -2315,6 +2314,87 @@ extension SubprocessIntegrationTests { } } + @Test( + .disabled( + "Flaky on FreeBSD in GitHub Actions", + { + #if os(FreeBSD) + return true + #else + return false + #endif + })) func testCancelAllTearsDownProcess() async throws + { + enum WhoReturned { + case okReceived + case finished(Subprocess.ExecutionOutcome) + } + struct WrongLineReceived: Error { + var line: String + } + struct NoOutputReceived: Error { + } + + let (okSignal, okSignalContinuation) = AsyncThrowingStream.makeStream(of: Void.self) + + try await withThrowingTaskGroup(of: WhoReturned.self) { group in + group.addTask { + let executable: Subprocess.Executable + let arguments: Subprocess.Arguments + #if os(Windows) + executable = .name("powershell.exe") + arguments = ["-Command", "echo OK ; Start-Sleep -Seconds 9999"] + #else + executable = .path("/bin/sh") + arguments = [ + "-c", + """ + echo OK; + while true; do sleep 1; done + """, + ] + #endif + let outcome = try await Subprocess.run( + executable, + arguments: arguments, + error: .discarded + ) { execution, outputSequence in + for try await line in outputSequence.strings(separatedBy: .lineBreaks) { + if line == "OK" { + okSignalContinuation.yield(()) + okSignalContinuation.finish() + } else { + Issue.record("unexpected output: \(line)") + okSignalContinuation.finish(throwing: WrongLineReceived(line: line)) + } + return + } + Issue.record("no output") + okSignalContinuation.finish(throwing: NoOutputReceived()) + } + return .finished(outcome) + } + + group.addTask { + var iterator = okSignal.makeAsyncIterator() + _ = try await iterator.next() + return .okReceived + } + + var finalOutcome: Subprocess.ExecutionOutcome? = nil + while let taskResult = try await group.next() { + switch taskResult { + case .okReceived: + group.cancelAll() + case .finished(let outcome): + finalOutcome = outcome + } + } + + #expect(finalOutcome != nil) + } + } + @Test func testExitCode() async throws { for exitCode in UInt8.min.. Date: Thu, 30 Apr 2026 16:30:28 -0700 Subject: [PATCH 2/2] Fix flaky testSubprocessDoesNotInheritRandomFileDescriptors on macOS CI Move the test pipe write end to fd 1000 via FileDescriptor.duplicate to avoid interaction with library-internal pipe fds that share nearby fd numbers on macOS 26, where POSIX_SPAWN_CLOEXEC_DEFAULT appears to exempt source fds of adddup2 file actions from close-on-exec. Co-Authored-By: Claude Opus 4.6 (1M context) --- Tests/SubprocessTests/UnixTests.swift | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/Tests/SubprocessTests/UnixTests.swift b/Tests/SubprocessTests/UnixTests.swift index 96722557..0043cec2 100644 --- a/Tests/SubprocessTests/UnixTests.swift +++ b/Tests/SubprocessTests/UnixTests.swift @@ -453,16 +453,21 @@ extension SubprocessUnixTests { @Test(.requiresBash) func testSubprocessDoesNotInheritRandomFileDescriptors() async throws { let pipe = try FileDescriptor.ssp_pipe() + // Move write end to a high fd to avoid interaction with library-internal fds + // that may share the same fd number on some platforms + let testWriteEnd = try pipe.writeEnd.duplicate(as: FileDescriptor(rawValue: 1000)) + try pipe.writeEnd.close() + try await pipe.readEnd.closeAfter { - let result = try await pipe.writeEnd.closeAfter { + let result = try await testWriteEnd.closeAfter { // Spawn bash and then attempt to write to the write end try await Subprocess.run( .name("bash"), arguments: [ "-c", """ - echo this string should be discarded >&\(pipe.writeEnd.rawValue); - echo wrote into \(pipe.writeEnd.rawValue), echo exit code $?; + echo this string should be discarded >&\(testWriteEnd.rawValue); + echo wrote into \(testWriteEnd.rawValue), echo exit code $?; """, ], input: .none, @@ -479,7 +484,7 @@ extension SubprocessUnixTests { } #expect(readCount == 0) #expect( - result.standardOutput?.trimmingNewLineAndQuotes() == "wrote into \(pipe.writeEnd.rawValue), echo exit code 1" + result.standardOutput?.trimmingNewLineAndQuotes() == "wrote into \(testWriteEnd.rawValue), echo exit code 1" ) } }