diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 17429cfb..43b7339d 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -38,9 +38,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { /// The element type for the asynchronous sequence. public typealias Element = Buffer - #if SUBPROCESS_ASYNCIO_DISPATCH - internal typealias DiskIO = DispatchIO - #elseif canImport(WinSDK) + #if canImport(WinSDK) internal typealias DiskIO = HANDLE #else internal typealias DiskIO = FileDescriptor @@ -74,9 +72,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { ) guard let data else { // We finished reading. Close the file descriptor now - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.diskIO)) - #elseif canImport(WinSDK) + #if canImport(WinSDK) try _safelyClose(.handle(self.diskIO)) #else try _safelyClose(.fileDescriptor(self.diskIO)) diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 3edfe42e..708866a0 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -760,7 +760,6 @@ internal enum _CloseTarget { #else case fileDescriptor(FileDescriptor) #endif - case dispatchIO(DispatchIO) } internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) { @@ -822,8 +821,6 @@ internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) { ) } #endif - case .dispatchIO(let dispatchIO): - dispatchIO.close() } } @@ -879,24 +876,7 @@ internal struct IODescriptor: ~Copyable { consuming func createIOChannel() -> IOChannel { let shouldClose = self.closeWhenDone self.closeWhenDone = false - #if SUBPROCESS_ASYNCIO_DISPATCH - // Transferring out the ownership of fileDescriptor means we don't have go close here - let closeFd = self.descriptor - let dispatchIO: DispatchIO = DispatchIO( - type: .stream, - fileDescriptor: self.platformDescriptor(), - queue: .global(), - cleanupHandler: { @Sendable error in - // Close the file descriptor - if shouldClose { - try? closeFd.close() - } - } - ) - return IOChannel(dispatchIO, closeWhenDone: shouldClose) - #else return IOChannel(self.descriptor, closeWhenDone: shouldClose) - #endif } internal mutating func safelyClose() throws(SubprocessError) { @@ -934,9 +914,7 @@ internal struct IODescriptor: ~Copyable { } internal struct IOChannel: ~Copyable, @unchecked Sendable { - #if SUBPROCESS_ASYNCIO_DISPATCH - typealias Channel = DispatchIO - #elseif canImport(WinSDK) + #if canImport(WinSDK) typealias Channel = HANDLE #else typealias Channel = FileDescriptor @@ -959,9 +937,7 @@ internal struct IOChannel: ~Copyable, @unchecked Sendable { } closeWhenDone = false - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.channel)) - #elseif canImport(WinSDK) + #if canImport(WinSDK) try _safelyClose(.handle(self.channel)) #else try _safelyClose(.fileDescriptor(self.channel)) diff --git a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift index b8a438bd..f667490d 100644 --- a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift +++ b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift @@ -33,49 +33,81 @@ final class AsyncIO: Sendable { from diskIO: borrowing IOChannel, upTo maxLength: Int ) async throws(SubprocessError) -> DispatchData? { - return try await self.read( - from: diskIO.channel, - upTo: maxLength, + if maxLength == .max { + return try await self.readAll(from: diskIO, upTo: maxLength) + } + return try await self.readChunk( + fd: diskIO.channel.rawValue, + maxLength: maxLength, ) } internal func read( - from dispatchIO: DispatchIO, + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> DispatchData? { + if maxLength == .max { + return try await self.readAll( + fd: fileDescriptor.rawValue, + upTo: maxLength + ) + } + return try await self.readChunk( + fd: fileDescriptor.rawValue, + maxLength: maxLength, + ) + } + + internal func readAll( + from diskIO: borrowing IOChannel, + upTo maxLength: Int + ) async throws(SubprocessError) -> DispatchData? { + return try await self.readAll( + fd: diskIO.channel.rawValue, + upTo: maxLength + ) + } + + private func readAll( + fd: Int32, upTo maxLength: Int + ) async throws(SubprocessError) -> DispatchData? { + var accumulated: DispatchData = .empty + while accumulated.count < maxLength { + if Task.isCancelled { + throw SubprocessError.asyncIOFailed( + reason: "Cancelled", + underlyingError: nil + ) + } + let remaining = maxLength == .max ? .max : maxLength - accumulated.count + let chunk: DispatchData? = try await self.readChunk(fd: fd, maxLength: remaining) + guard let chunk else { break } + accumulated.append(chunk) + } + return accumulated.isEmpty ? nil : accumulated + } + + private func readChunk( + fd: Int32, + maxLength: Int ) async throws(SubprocessError) -> DispatchData? { // https://github.com/swiftlang/swift/issues/87810 return try await _castError { return try await withCheckedThrowingContinuation { continuation in - var buffer: DispatchData = .empty - dispatchIO.read( - offset: 0, - length: maxLength, - queue: DispatchQueue(label: "SubprocessReadQueue") - ) { done, data, error in + DispatchIO.read( + fromFileDescriptor: fd, + maxLength: maxLength, + runningHandlerOn: .global() + ) { data, error in if error != 0 { continuation.resume( - throwing: - SubprocessError - .failedToReadFromProcess( - withUnderlyingError: Errno(rawValue: error) - ) - ) + throwing: SubprocessError.failedToReadFromProcess( + withUnderlyingError: Errno(rawValue: error) + )) return } - if let data { - if buffer.isEmpty { - buffer = data - } else { - buffer.append(data) - } - } - if done { - if !buffer.isEmpty { - continuation.resume(returning: buffer) - } else { - continuation.resume(returning: nil) - } - } + continuation.resume(returning: data.isEmpty ? nil : data) } } } @@ -147,27 +179,31 @@ final class AsyncIO: Sendable { queue: DispatchQueue = .global(), completion: @escaping (Int, SubprocessError?) -> Void ) { - diskIO.channel.write( - offset: 0, - data: dispatchData, - queue: queue - ) { done, unwritten, error in - guard done else { - // Wait until we are done writing or encountered some error - return - } - - let unwrittenLength = unwritten?.count ?? 0 - let writtenLength = dispatchData.count - unwrittenLength - guard error != 0 else { - completion(writtenLength, nil) - return + let fd = diskIO.channel.rawValue + var totalWritten = 0 + func writeRemaining(_ data: DispatchData) { + DispatchIO.write( + toFileDescriptor: fd, + data: data, + runningHandlerOn: queue + ) { unwritten, error in + let unwrittenLength = unwritten?.count ?? 0 + totalWritten += data.count - unwrittenLength + if error != 0 { + completion( + totalWritten, + .failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error)) + ) + return + } + if let unwritten, !unwritten.isEmpty { + writeRemaining(unwritten) + } else { + completion(totalWritten, nil) + } } - completion( - writtenLength, - .failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error)) - ) } + writeRemaining(dispatchData) } } diff --git a/Sources/Subprocess/IO/AsyncIO+Linux.swift b/Sources/Subprocess/IO/AsyncIO+Linux.swift index 9baf1a0f..84322eb4 100644 --- a/Sources/Subprocess/IO/AsyncIO+Linux.swift +++ b/Sources/Subprocess/IO/AsyncIO+Linux.swift @@ -332,12 +332,55 @@ extension AsyncIO { from diskIO: borrowing IOChannel, upTo maxLength: Int ) async throws(SubprocessError) -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) + if maxLength == .max { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + return try await self._readChunk(from: diskIO.channel, upTo: maxLength) } func read( from fileDescriptor: FileDescriptor, upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + if maxLength == .max { + return try await self._readAll(from: fileDescriptor, upTo: maxLength) + } + return try await self._readChunk(from: fileDescriptor, upTo: maxLength) + } + + func readAll( + from diskIO: borrowing IOChannel, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + + private func _readChunk( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: fileDescriptor, + upTo: maxLength, + returnOnFirstRead: true + ) + } + + private func _readAll( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: fileDescriptor, + upTo: maxLength, + returnOnFirstRead: false + ) + } + + private func _read( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int, + returnOnFirstRead: Bool ) async throws(SubprocessError) -> [UInt8]? { guard maxLength > 0 else { return nil @@ -375,7 +418,11 @@ extension AsyncIO { if bytesRead > 0 { // Read some data readLength += bytesRead - if maxLength == .max { + if returnOnFirstRead { + try self.removeRegistration(for: fileDescriptor) + resultBuffer.removeLast(resultBuffer.count - readLength) + return resultBuffer + } else if maxLength == .max { // Grow resultBuffer if needed guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { continue diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index b3cfbce3..c8dff9fe 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -246,12 +246,55 @@ final class AsyncIO: @unchecked Sendable { from diskIO: borrowing IOChannel, upTo maxLength: Int ) async throws(SubprocessError) -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) + if maxLength == .max { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + return try await self._readChunk(from: diskIO.channel, upTo: maxLength) } func read( from handle: HANDLE, upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + if maxLength == .max { + return try await self._readAll(from: handle, upTo: maxLength) + } + return try await self._readChunk(from: handle, upTo: maxLength) + } + + func readAll( + from diskIO: borrowing IOChannel, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._readAll(from: diskIO.channel, upTo: maxLength) + } + + private func _readChunk( + from handle: HANDLE, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: handle, + upTo: maxLength, + returnOnFirstRead: true + ) + } + + private func _readAll( + from handle: HANDLE, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self._read( + from: handle, + upTo: maxLength, + returnOnFirstRead: false + ) + } + + private func _read( + from handle: HANDLE, + upTo maxLength: Int, + returnOnFirstRead: Bool ) async throws(SubprocessError) -> [UInt8]? { guard maxLength > 0 else { return nil @@ -333,7 +376,10 @@ final class AsyncIO: @unchecked Sendable { } else { // Read some data readLength += Int(truncatingIfNeeded: bytesRead) - if maxLength == .max { + if returnOnFirstRead { + resultBuffer.removeLast(resultBuffer.count - readLength) + return resultBuffer + } else if maxLength == .max { // Grow resultBuffer if needed guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { continue diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index b0a14495..4463e711 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -141,7 +141,7 @@ public struct BytesOutput: OutputProtocol, ErrorOutputProtocol { // more byte to determine whether output exceeds the limit maxLength += 1 } - result = try await AsyncIO.shared.read(from: diskIO, upTo: maxLength) + result = try await AsyncIO.shared.readAll(from: diskIO, upTo: maxLength) } catch { try diskIO.safelyClose() throw error @@ -349,7 +349,7 @@ extension OutputProtocol { // more byte to determine whether output exceeds the limit maxLength += 1 } - result = try await AsyncIO.shared.read(from: diskIO, upTo: maxLength) + result = try await AsyncIO.shared.readAll(from: diskIO, upTo: maxLength) } catch { try diskIO.safelyClose() throw error diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index 0626df12..a63acc7f 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -71,7 +71,7 @@ extension SubprocessAsyncIOTests { try await runReadWriteTest { readIO, readTestBed in for expectedChunk in chunks { let readData = try #require( - try await readIO.read(from: readTestBed.ioChannel, upTo: expectedChunk.count) + try await readIO.readAll(from: readTestBed.ioChannel, upTo: expectedChunk.count) ) #expect(Array(readData) == expectedChunk) } @@ -285,9 +285,7 @@ extension SubprocessAsyncIOTests { extension SubprocessAsyncIOTests.TestBed { consuming func finish() async throws { - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.ioChannel.channel)) - #elseif canImport(WinSDK) + #if canImport(WinSDK) try _safelyClose(.handle(self.ioChannel.channel)) #else try _safelyClose(.fileDescriptor(self.ioChannel.channel)) diff --git a/Tests/SubprocessTests/IntegrationTests.swift b/Tests/SubprocessTests/IntegrationTests.swift index 19b5b922..91f89336 100644 --- a/Tests/SubprocessTests/IntegrationTests.swift +++ b/Tests/SubprocessTests/IntegrationTests.swift @@ -1833,7 +1833,7 @@ extension SubprocessIntegrationTests { #expect(result.standardError?.trimmingNewLineAndQuotes() == "") } - @Test func testCustomStreamingBufferSize() async throws { + @Test func testStreamingSmallInputs() async throws { #if os(Windows) let setup = TestSetup( executable: .name("cmd.exe"), @@ -1863,13 +1863,12 @@ extension SubprocessIntegrationTests { _ = try await _run( setup, input: .none, - error: .discarded, - preferredBufferSize: 1 + error: .discarded ) { execution, standardOutput in for try await line in standardOutput.strings() { - // If we use default buffer size this test will hang - // because Subprocess is stuck on waiting 16k worth of - // output when there are only 3. + // This test should not hang when there are only + // 3 bytes of output even though Subprocess' + // internal buffer size is something like 16k. #expect(line.trimmingNewLineAndQuotes() == "one") // Kill the child process since it intentionally hang #if os(Windows) @@ -2315,6 +2314,87 @@ extension SubprocessIntegrationTests { } } + @Test( + .disabled( + "Flaky on FreeBSD in GitHub Actions", + { + #if os(FreeBSD) + return true + #else + return false + #endif + })) func testCancelAllTearsDownProcess() async throws + { + enum WhoReturned { + case okReceived + case finished(Subprocess.ExecutionOutcome) + } + struct WrongLineReceived: Error { + var line: String + } + struct NoOutputReceived: Error { + } + + let (okSignal, okSignalContinuation) = AsyncThrowingStream.makeStream(of: Void.self) + + try await withThrowingTaskGroup(of: WhoReturned.self) { group in + group.addTask { + let executable: Subprocess.Executable + let arguments: Subprocess.Arguments + #if os(Windows) + executable = .name("powershell.exe") + arguments = ["-Command", "echo OK ; Start-Sleep -Seconds 9999"] + #else + executable = .path("/bin/sh") + arguments = [ + "-c", + """ + echo OK; + while true; do sleep 1; done + """, + ] + #endif + let outcome = try await Subprocess.run( + executable, + arguments: arguments, + error: .discarded + ) { execution, outputSequence in + for try await line in outputSequence.strings(separatedBy: .lineBreaks) { + if line == "OK" { + okSignalContinuation.yield(()) + okSignalContinuation.finish() + } else { + Issue.record("unexpected output: \(line)") + okSignalContinuation.finish(throwing: WrongLineReceived(line: line)) + } + return + } + Issue.record("no output") + okSignalContinuation.finish(throwing: NoOutputReceived()) + } + return .finished(outcome) + } + + group.addTask { + var iterator = okSignal.makeAsyncIterator() + _ = try await iterator.next() + return .okReceived + } + + var finalOutcome: Subprocess.ExecutionOutcome? = nil + while let taskResult = try await group.next() { + switch taskResult { + case .okReceived: + group.cancelAll() + case .finished(let outcome): + finalOutcome = outcome + } + } + + #expect(finalOutcome != nil) + } + } + @Test func testExitCode() async throws { for exitCode in UInt8.min..&\(pipe.writeEnd.rawValue); - echo wrote into \(pipe.writeEnd.rawValue), echo exit code $?; + echo this string should be discarded >&\(testWriteEnd.rawValue); + echo wrote into \(testWriteEnd.rawValue), echo exit code $?; """, ], input: .none, @@ -480,7 +484,7 @@ extension SubprocessUnixTests { } #expect(readCount == 0) #expect( - result.standardOutput?.trimmingNewLineAndQuotes() == "wrote into \(pipe.writeEnd.rawValue), echo exit code 1" + result.standardOutput?.trimmingNewLineAndQuotes() == "wrote into \(testWriteEnd.rawValue), echo exit code 1" ) } }