From 44f51b9af4614c97c5dcf6291f3a296ca04303cc Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 29 Apr 2026 11:06:37 -0700 Subject: [PATCH] Make AsyncIO.read's maxLength a cap and rewrite Darwin backend Previously AsyncIO.read(upTo: maxLength) treated maxLength as a target length. It would accumulate bytes in a loop until it had gathered that many before returning. This deadlocks when a subprocess writes fewer bytes than maxLength and then pauses or waits on stdin: the parent blocks forever on data the child never sends. This commit change maxLength to a "cap", as its name suggests. Each read(upTo:) now returns as soon as the underlying nonblocking read(2) (or ReadFile) completes. Callers that need a specific byte count accumulate across calls themselves. Follow-on cleanup enabled by the semantics fix: - Replace Darwin's DispatchIO backend with kqueue + read(2). The new Darwin path is a thin event-source shim over the same nonblocking read/write loop that Linux and Android already use, sharing implementation in a new IO/AsyncIO+Unix.swift. - Collapse IOChannel into IODescriptor. IOChannel only existed to wrap a DispatchIO; with DispatchIO gone, a single fd-owning type suffices. - Drop the public `preferredBufferSize` parameter from every run() overload. It was introduced to let callers work around the old hang; buffer size is now derived from the pipe's capacity via F_GETPIPE_SZ (Linux/Android), fstat().st_blksize (Darwin/OpenBSD), or a fixed 64 KB fallback (FreeBSD, Windows). --- Package.swift | 2 +- Sources/Subprocess/API.swift | 181 +++----- Sources/Subprocess/AsyncBufferSequence.swift | 51 +-- Sources/Subprocess/Buffer.swift | 105 ----- Sources/Subprocess/CMakeLists.txt | 7 +- Sources/Subprocess/Configuration.swift | 166 +++----- Sources/Subprocess/Error.swift | 9 + Sources/Subprocess/IO/AsyncIO+Dispatch.swift | 186 -------- Sources/Subprocess/IO/AsyncIO+KQueue.swift | 396 ++++++++++++++++++ Sources/Subprocess/IO/AsyncIO+Linux.swift | 313 ++------------ Sources/Subprocess/IO/AsyncIO+Unix.swift | 283 +++++++++++++ Sources/Subprocess/IO/AsyncIO+Windows.swift | 217 ++++------ Sources/Subprocess/IO/Input.swift | 4 +- Sources/Subprocess/IO/Output.swift | 100 ++--- .../Platforms/Subprocess+Darwin.swift | 6 +- .../Platforms/Subprocess+Linux.swift | 47 ++- .../Platforms/Subprocess+Unix.swift | 8 +- .../Platforms/Subprocess+Windows.swift | 16 +- Sources/Subprocess/Span+Subprocess.swift | 28 +- .../Input+Foundation.swift | 36 +- .../_SubprocessCShims/include/process_shims.h | 6 +- Tests/SubprocessTests/AsyncIOTests.swift | 133 ++++-- Tests/SubprocessTests/IntegrationTests.swift | 7 +- Tests/SubprocessTests/UnixTests.swift | 6 +- 24 files changed, 1117 insertions(+), 1196 deletions(-) delete mode 100644 Sources/Subprocess/IO/AsyncIO+Dispatch.swift create mode 100644 Sources/Subprocess/IO/AsyncIO+KQueue.swift create mode 100644 Sources/Subprocess/IO/AsyncIO+Unix.swift diff --git a/Package.swift b/Package.swift index 56a47966..3cdcd5c9 100644 --- a/Package.swift +++ b/Package.swift @@ -22,7 +22,7 @@ dep.append( let defaultTraits: Set = ["SubprocessFoundation"] let packageSwiftSettings: [SwiftSetting] = [ - .define("SUBPROCESS_ASYNCIO_DISPATCH", .when(platforms: [.macOS, .custom("freebsd"), .openbsd])), + .define("SUBPROCESS_ASYNCIO_KQUEUE", .when(platforms: [.macOS, .custom("freebsd"), .openbsd])), .enableUpcomingFeature("ExistentialAny"), .enableUpcomingFeature("MemberImportVisibility"), .enableUpcomingFeature("InternalImportsByDefault"), diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index a731a189..74b8e261 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -112,7 +112,6 @@ public func run< /// - input: The input to send to the executable. /// - output: How to manage executable standard output. /// - error: How to manage executable standard error. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -160,12 +159,6 @@ public func run< /// - platformOptions: The platform-specific options to use when running the executable. /// - input: The input to send to the executable. /// - error: How to manage executable standard error. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard output stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -180,7 +173,6 @@ public func run( platformOptions: PlatformOptions = PlatformOptions(), input: Input = .none, error: Error = .discarded, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ outputSequence: AsyncBufferSequence @@ -197,7 +189,6 @@ public func run( configuration, input: input, error: error, - preferredBufferSize: preferredBufferSize, body: body ) } @@ -212,12 +203,6 @@ public func run( /// - platformOptions: The platform-specific options to use when running the executable. /// - input: The input to send to the executable. /// - output: How to manage executable standard output. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -232,7 +217,6 @@ public func run( platformOptions: PlatformOptions = PlatformOptions(), input: Input = .none, output: Output, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ errorSequence: AsyncBufferSequence @@ -249,7 +233,6 @@ public func run( configuration, input: input, output: output, - preferredBufferSize: preferredBufferSize, body: body ) } @@ -313,12 +296,6 @@ public func run( /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform-specific options to use when running the executable. /// - error: How to manage executable standard error. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard output stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -333,7 +310,6 @@ public func run( workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), error: Error = .discarded, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ inputWriter: StandardInputWriter, @@ -350,7 +326,6 @@ public func run( return try await run( configuration, error: error, - preferredBufferSize: preferredBufferSize, body: body ) } @@ -364,12 +339,6 @@ public func run( /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform-specific options to use when running the executable. /// - output: How to manage executable standard output. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -384,7 +353,6 @@ public func run( workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), output: Output, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ inputWriter: StandardInputWriter, @@ -401,7 +369,6 @@ public func run( return try await run( configuration, output: output, - preferredBufferSize: preferredBufferSize, body: body ) } @@ -415,12 +382,6 @@ public func run( /// - environment: The environment in which to run the executable. /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform-specific options to use when running the executable. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard output and error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -435,7 +396,6 @@ public func run( environment: Environment = .inherit, workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ inputWriter: StandardInputWriter, @@ -452,7 +412,6 @@ public func run( ) return try await run( configuration, - preferredBufferSize: preferredBufferSize, body: body ) } @@ -490,9 +449,9 @@ public func run< output: try output.createPipe(), error: try error.createPipe(), ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IOChannel? = consume inputIO - var outputIOBox: IOChannel? = consume outputIO - var errorIOBox: IOChannel? = consume errorIO + var inputIOBox: IODescriptor? = consume inputIO + var outputIOBox: IODescriptor? = consume outputIO + var errorIOBox: IODescriptor? = consume errorIO // Write input, capture output and error in parallel async let stdout = try output.captureOutput(from: outputIOBox.take()) @@ -552,16 +511,16 @@ public func run< error: errorPipe ) { execution, inputIO, outputIO, errorIO in // Write input, capture output and error in parallel - var inputIOBox: IOChannel? = consume inputIO - var outputIOBox: IOChannel? = consume outputIO - var errorIOBox: IOChannel? = consume errorIO + var inputIOBox: IODescriptor? = consume inputIO + var outputIOBox: IODescriptor? = consume outputIO + var errorIOBox: IODescriptor? = consume errorIO return try await withThrowingTaskGroup( of: OutputCapturingState?.self, returning: RunResult.self ) { group in - var inputIOContainer: IOChannel? = inputIOBox.take() - var outputIOContainer: IOChannel? = outputIOBox.take() - var errorIOContainer: IOChannel? = errorIOBox.take() + var inputIOContainer: IODescriptor? = inputIOBox.take() + var outputIOContainer: IODescriptor? = outputIOBox.take() + var errorIOContainer: IODescriptor? = errorIOBox.take() group.addTask { if let writeFd = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: writeFd) @@ -629,7 +588,6 @@ public func run< /// - input: The input to send to the executable. /// - output: How to manage executable standard output. /// - error: How to manage executable standard error. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -656,12 +614,17 @@ public func run< output: outputPipe, error: errorPipe ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IOChannel? = consume inputIO + var inputIOBox: IODescriptor? = consume inputIO + var outputIOBox: IODescriptor? = consume outputIO + var errorIOBox: IODescriptor? = consume errorIO + try outputIOBox?.safelyClose() + try errorIOBox?.safelyClose() + return try await withThrowingTaskGroup( of: Void.self, returning: Result.self ) { group in - var inputIOContainer: IOChannel? = inputIOBox.take() + var inputIOContainer: IODescriptor? = inputIOBox.take() group.addTask { if let inputIO = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: inputIO) @@ -672,6 +635,7 @@ public func run< // Body runs in the same isolation let result = try await body(execution) + try await group.waitForAll() return result } @@ -684,12 +648,6 @@ public func run< /// - configuration: The configuration to run. /// - input: The input to send to the executable. /// - error: How to manage executable standard error. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard output stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -704,7 +662,6 @@ public func run< _ configuration: Configuration, input: Input = .none, error: Error = .discarded, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ outputSequence: AsyncBufferSequence @@ -720,14 +677,16 @@ public func run< output: outputPipe, error: errorPipe ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IOChannel? = consume inputIO - var outputIOBox: IOChannel? = consume outputIO + var inputIOBox: IODescriptor? = consume inputIO + var outputIOBox: IODescriptor? = consume outputIO + var errorIOBox: IODescriptor? = consume errorIO + try errorIOBox?.safelyClose() return try await withThrowingTaskGroup( of: Void.self, returning: Result.self ) { group in - var inputIOContainer: IOChannel? = inputIOBox.take() + var inputIOContainer: IODescriptor? = inputIOBox.take() group.addTask { if let inputIO = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: inputIO) @@ -738,8 +697,7 @@ public func run< // Body runs in the same isolation let outputSequence = AsyncBufferSequence( - diskIO: outputIOBox.take()!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: outputIOBox!.consumeDescriptor() ) let result = try await body(execution, outputSequence) @@ -755,12 +713,6 @@ public func run< /// - configuration: The configuration to run. /// - input: The input to send to the executable. /// - output: How to manage executable standard output. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -771,7 +723,6 @@ public func run( _ configuration: Configuration, input: Input = .none, output: Output, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ errorSequence: AsyncBufferSequence @@ -784,14 +735,16 @@ public func run( output: try output.createPipe(), error: try error.createPipe(), ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IOChannel? = consume inputIO - var errorIOBox: IOChannel? = consume errorIO + var inputIOBox: IODescriptor? = consume inputIO + var outputIOBox: IODescriptor? = consume outputIO + var errorIOBox: IODescriptor? = consume errorIO + try outputIOBox?.safelyClose() return try await withThrowingTaskGroup( of: Void.self, returning: Result.self ) { group in - var inputIOContainer: IOChannel? = inputIOBox.take() + var inputIOContainer: IODescriptor? = inputIOBox.take() group.addTask { if let inputIO = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: inputIO) @@ -799,14 +752,12 @@ public func run( try await writer.finish() } } - - // Body runs in the same isolation let errorSequence = AsyncBufferSequence( - diskIO: errorIOBox.take()!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: errorIOBox!.consumeDescriptor() ) - + // Body runs in the same isolation let result = try await body(execution, errorSequence) + try await group.waitForAll() return result } @@ -850,15 +801,15 @@ public func run( output: try output.createPipe(), error: try error.createPipe(), ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IOChannel? = consume inputIO - var outputIOBox: IOChannel? = consume outputIO - var errorIOBox: IOChannel? = consume errorIO + var inputIOBox: IODescriptor? = consume inputIO + var outputIOBox: IODescriptor? = consume outputIO + var errorIOBox: IODescriptor? = consume errorIO return try await withThrowingTaskGroup( of: Void.self, returning: Result.self ) { group in - var inputIOContainer: IOChannel? = inputIOBox.take() + var inputIOContainer: IODescriptor? = inputIOBox.take() group.addTask { if let inputIO = inputIOContainer.take() { let writer = StandardInputWriter(diskIO: inputIO) @@ -869,13 +820,11 @@ public func run( // Body runs in the same isolation let outputSequence = AsyncBufferSequence( - diskIO: outputIOBox.take()!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: outputIOBox!.consumeDescriptor() ) let errorSequence = AsyncBufferSequence( - diskIO: errorIOBox.take()!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: errorIOBox!.consumeDescriptor() ) let result = try await body(execution, outputSequence, errorSequence) @@ -891,12 +840,6 @@ public func run( /// - Parameters: /// - configuration: The configuration to run. /// - error: How to manage executable standard error. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard output stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -907,7 +850,6 @@ public func run( public func run( _ configuration: Configuration, error: Error = .discarded, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ inputWriter: StandardInputWriter, @@ -925,13 +867,18 @@ public func run( output: outputPipe, error: errorPipe ) { execution, inputIO, outputIO, errorIO in + var outputIOBox = consume outputIO + var errorIOBox = consume errorIO + try errorIOBox?.safelyClose() + let writer = StandardInputWriter(diskIO: inputIO!) let outputSequence = AsyncBufferSequence( - diskIO: outputIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: outputIOBox!.consumeDescriptor() ) - return try await body(execution, writer, outputSequence) + let result = try await body(execution, writer, outputSequence) + try await writer.finish() + return result } } @@ -941,12 +888,6 @@ public func run( /// - Parameters: /// - configuration: The configuration to run. /// - output: How to manage executable standard output. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -957,7 +898,6 @@ public func run( public func run( _ configuration: Configuration, output: Output, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ inputWriter: StandardInputWriter, @@ -972,12 +912,17 @@ public func run( output: try output.createPipe(), error: try error.createPipe(), ) { execution, inputIO, outputIO, errorIO in + var outputIOBox = consume outputIO + var errorIOBox = consume errorIO + try outputIOBox?.safelyClose() + let writer = StandardInputWriter(diskIO: inputIO!) let errorSequence = AsyncBufferSequence( - diskIO: errorIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: errorIOBox!.consumeDescriptor() ) - return try await body(execution, writer, errorSequence) + let bodyResult = try await body(execution, writer, errorSequence) + try await writer.finish() + return bodyResult } } @@ -986,12 +931,6 @@ public func run( /// standard input, and stream its standard output and standard error. /// - Parameters: /// - configuration: The configuration to run. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard output and error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. /// - body: A closure to manage the running process. /// All arguments passed to this closure are valid only for /// the duration of the closure's execution and must not be escaped. @@ -1002,7 +941,6 @@ public func run( /// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. public func run( _ configuration: Configuration, - preferredBufferSize: Int? = nil, body: ( _ execution: Execution, _ inputWriter: StandardInputWriter, @@ -1019,15 +957,18 @@ public func run( output: try output.createPipe(), error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in + var outputIOBox = consume outputIO + var errorIOBox = consume errorIO + let writer = StandardInputWriter(diskIO: inputIO!) let outputSequence = AsyncBufferSequence( - diskIO: outputIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: outputIOBox!.consumeDescriptor() ) let errorSequence = AsyncBufferSequence( - diskIO: errorIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + diskIO: errorIOBox!.consumeDescriptor() ) - return try await body(execution, writer, outputSequence, errorSequence) + let result = try await body(execution, writer, outputSequence, errorSequence) + try await writer.finish() + return result } } diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 17429cfb..958dee76 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -38,9 +38,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { /// The element type for the asynchronous sequence. public typealias Element = Buffer - #if SUBPROCESS_ASYNCIO_DISPATCH - internal typealias DiskIO = DispatchIO - #elseif canImport(WinSDK) + #if canImport(WinSDK) internal typealias DiskIO = HANDLE #else internal typealias DiskIO = FileDescriptor @@ -55,10 +53,11 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { private let preferredBufferSize: Int private var buffer: [Buffer] - internal init(diskIO: DiskIO, preferredBufferSize: Int?) { + internal init(diskIO: DiskIO) { self.diskIO = diskIO self.buffer = [] - self.preferredBufferSize = preferredBufferSize ?? readBufferSize + // Only need to query it once at beginning of stream + self.preferredBufferSize = AsyncIO.queryPipeBufferSize(for: diskIO) } /// Retrieves the next buffer in the sequence, or `nil` if the sequence ended. @@ -74,24 +73,14 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { ) guard let data else { // We finished reading. Close the file descriptor now - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.diskIO)) - #elseif canImport(WinSDK) + #if canImport(WinSDK) try _safelyClose(.handle(self.diskIO)) #else try _safelyClose(.fileDescriptor(self.diskIO)) #endif return nil } - let createdBuffers = Buffer.createFrom(data) - // Most (all?) cases there should be only one buffer - // because DispatchData are mostly contiguous - if _fastPath(createdBuffers.count == 1) { - // No need to push to the stack - return createdBuffers[0] - } - self.buffer = createdBuffers - return self.buffer.removeFirst() + return Buffer(data: data) } /// Retrieves the next buffer in the sequence, or `nil` if the sequence ended. @@ -101,19 +90,14 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { } private let diskIO: DiskIO - private let preferredBufferSize: Int? - internal init(diskIO: DiskIO, preferredBufferSize: Int?) { + internal init(diskIO: DiskIO) { self.diskIO = diskIO - self.preferredBufferSize = preferredBufferSize } /// Creates an iterator for this asynchronous sequence. public func makeAsyncIterator() -> Iterator { - return Iterator( - diskIO: self.diskIO, - preferredBufferSize: self.preferredBufferSize - ) + return Iterator(diskIO: self.diskIO) } /// Splits the buffer into strings using the specified separator. @@ -268,29 +252,12 @@ extension AsyncBufferSequence { self.eofReached = true return nil } - #if SUBPROCESS_ASYNCIO_DISPATCH - // Unfortunately here we _have to_ copy the bytes out because - // DispatchIO (rightfully) reuses buffer, which means `buffer.data` - // has the same address on all iterations, therefore we can't directly - // create the result array from buffer.data - - // Calculate how many CodePoint elements we have - let elementCount = buffer.data.count / MemoryLayout.stride - - // Create array by copying from the buffer reinterpreted as CodePoint - let result: Array = buffer.data.withUnsafeBytes { ptr -> Array in - return Array( - UnsafeBufferPointer(start: ptr.baseAddress?.assumingMemoryBound(to: Encoding.CodeUnit.self), count: elementCount) - ) - } - #else // Cast data to CodeUnit type let result = buffer.withUnsafeBytes { ptr in return ptr.withMemoryRebound(to: Encoding.CodeUnit.self) { codeUnitPtr in return Array(codeUnitPtr) } } - #endif return result.isEmpty ? nil : result } @@ -604,6 +571,6 @@ private let _pageSize: Int = Int(getpagesize()) #endif // canImport(Darwin) @inline(__always) -internal var readBufferSize: Int { +internal var systemPageSize: Int { return _pageSize } diff --git a/Sources/Subprocess/Buffer.swift b/Sources/Subprocess/Buffer.swift index 0aa425e3..1c350ad8 100644 --- a/Sources/Subprocess/Buffer.swift +++ b/Sources/Subprocess/Buffer.swift @@ -9,11 +9,6 @@ // //===----------------------------------------------------------------------===// -// swift-format-ignore-file - -#if canImport(Darwin) || canImport(Glibc) || canImport(Android) || canImport(Musl) -@preconcurrency internal import Dispatch - #if SubprocessFoundation #if canImport(Darwin) @@ -24,41 +19,16 @@ internal import Foundation internal import FoundationEssentials #endif -#endif #endif extension AsyncBufferSequence { /// An immutable collection of bytes. public struct Buffer: Sendable { - #if SUBPROCESS_ASYNCIO_DISPATCH - // We need to keep the backingData alive while Slice is alive - internal let backingData: DispatchData - internal let data: DispatchData.Region - - internal init(data: DispatchData.Region, backingData: DispatchData) { - self.data = data - self.backingData = backingData - } - - internal static func createFrom(_ data: DispatchData) -> [Buffer] { - let slices = data.regions - // In most (all?) cases data should only have one slice - if _fastPath(slices.count == 1) { - return [.init(data: slices[0], backingData: data)] - } - return slices.map { .init(data: $0, backingData: data) } - } - #else internal let data: [UInt8] internal init(data: [UInt8]) { self.data = data } - - internal static func createFrom(_ data: [UInt8]) -> [Buffer] { - return [.init(data: data)] - } - #endif // SUBPROCESS_ASYNCIO_DISPATCH } } @@ -104,78 +74,3 @@ extension AsyncBufferSequence.Buffer { } } } - -// MARK: - Hashable, Equatable -extension AsyncBufferSequence.Buffer: Equatable, Hashable { - #if SUBPROCESS_ASYNCIO_DISPATCH - /// Returns a Boolean value that indicates whether two buffers are equal. - public static func == (lhs: AsyncBufferSequence.Buffer, rhs: AsyncBufferSequence.Buffer) -> Bool { - return lhs.data == rhs.data - } - - /// Hashes the essential components of this value by feeding them into the given hasher. - public func hash(into hasher: inout Hasher) { - return self.data.hash(into: &hasher) - } - #endif - // else Compiler generated conformances -} - -#if SUBPROCESS_ASYNCIO_DISPATCH -extension DispatchData.Region { - static func == (lhs: DispatchData.Region, rhs: DispatchData.Region) -> Bool { - return lhs.withUnsafeBytes { lhsBytes in - return rhs.withUnsafeBytes { rhsBytes in - return lhsBytes.elementsEqual(rhsBytes) - } - } - } - - internal func hash(into hasher: inout Hasher) { - return self.withUnsafeBytes { ptr in - return hasher.combine(bytes: ptr) - } - } -} -#if !canImport(Darwin) || !SubprocessFoundation -/// `DispatchData.Region` is defined in Foundation, but we can't depend on Foundation when the SubprocessFoundation trait is disabled. -extension DispatchData { - typealias Region = _ContiguousBufferView - - var regions: [Region] { - contiguousBufferViews - } - - internal struct _ContiguousBufferView: @unchecked Sendable, RandomAccessCollection { - typealias Element = UInt8 - - internal let bytes: UnsafeBufferPointer - - internal var startIndex: Int { self.bytes.startIndex } - internal var endIndex: Int { self.bytes.endIndex } - - internal init(bytes: UnsafeBufferPointer) { - self.bytes = bytes - } - - internal func withUnsafeBytes(_ body: (UnsafeRawBufferPointer) throws -> ResultType) rethrows -> ResultType { - return try body(UnsafeRawBufferPointer(self.bytes)) - } - - subscript(position: Int) -> UInt8 { - _read { - yield self.bytes[position] - } - } - } - - internal var contiguousBufferViews: [_ContiguousBufferView] { - var slices = [_ContiguousBufferView]() - enumerateBytes { (bytes, index, stop) in - slices.append(_ContiguousBufferView(bytes: bytes)) - } - return slices - } -} -#endif -#endif diff --git a/Sources/Subprocess/CMakeLists.txt b/Sources/Subprocess/CMakeLists.txt index 9d09119d..438bb0b3 100644 --- a/Sources/Subprocess/CMakeLists.txt +++ b/Sources/Subprocess/CMakeLists.txt @@ -18,9 +18,10 @@ add_library(Subprocess Result.swift IO/Output.swift IO/Input.swift - IO/AsyncIO+Dispatch.swift + IO/AsyncIO+KQueue.swift IO/AsyncIO+Linux.swift IO/AsyncIO+Windows.swift + IO/AsyncIO+Unix.swift Span+Subprocess.swift AsyncBufferSequence.swift API.swift @@ -41,13 +42,13 @@ elseif(APPLE) Platforms/Subprocess+Darwin.swift Platforms/Subprocess+Unix.swift) target_compile_options(Subprocess PRIVATE - "$<$:-DSUBPROCESS_ASYNCIO_DISPATCH>") + "$<$:-DSUBPROCESS_ASYNCIO_KQUEUE>") elseif(FREEBSD OR OPENBSD) target_sources(Subprocess PRIVATE Platforms/Subprocess+BSD.swift Platforms/Subprocess+Unix.swift) target_compile_options(Subprocess PRIVATE - "$<$:-DSUBPROCESS_ASYNCIO_DISPATCH>") + "$<$:-DSUBPROCESS_ASYNCIO_KQUEUE>") endif() target_compile_options(Subprocess PRIVATE diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 3edfe42e..1a87c0ee 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -27,10 +27,6 @@ import Musl @preconcurrency public import WinSDK #endif -internal import Dispatch - -import Synchronization - /// A collection of configuration parameters to use when /// spawning a subprocess. public struct Configuration: Sendable { @@ -93,36 +89,34 @@ public struct Configuration: Sendable { _ body: ( ( Execution, - consuming IOChannel?, - consuming IOChannel?, - consuming IOChannel? + consuming IODescriptor?, + consuming IODescriptor?, + consuming IODescriptor? ) async throws -> Result ) ) async throws -> ExecutionOutcome { - let spawnResults = try await self.spawn( + var spawnResults = try await self.spawn( withInput: input, outputPipe: output, errorPipe: error ) - var spawnResultBox: SpawnResult?? = consume spawnResults - var _spawnResult = spawnResultBox!.take()! - - let execution = _spawnResult.execution + let execution = spawnResults.execution defer { // Close process file descriptor now we finished monitoring execution.processIdentifier.close() } return try await withAsyncTaskCleanupHandler { () throws -> ExecutionOutcome in - let inputIO = _spawnResult.inputWriteEnd() - let outputIO = _spawnResult.outputReadEnd() - let errorIO = _spawnResult.errorReadEnd() + let inputIO = spawnResults.inputWriteEnd() + let outputIO = spawnResults.outputReadEnd() + let errorIO = spawnResults.errorReadEnd() let result: Swift.Result do { // Body runs in the same isolation - let bodyResult = try await body(_spawnResult.execution, inputIO, outputIO, errorIO) + let bodyResult = try await body(execution, inputIO, outputIO, errorIO) + result = .success(bodyResult) } catch { result = .failure(error) @@ -200,17 +194,17 @@ extension Configuration { // as opposed to actually try to close it. var remainingSet: Set = Set( optionalSequence: [ - inputRead?.descriptor, - inputWrite?.descriptor, - outputRead?.descriptor, - outputWrite?.descriptor, - errorRead?.descriptor, - errorWrite?.descriptor, + inputRead?.descriptor(), + inputWrite?.descriptor(), + outputRead?.descriptor(), + outputWrite?.descriptor(), + errorRead?.descriptor(), + errorWrite?.descriptor(), ] ) do { - if remainingSet.tryRemove(inputRead?.descriptor) { + if remainingSet.tryRemove(inputRead?.descriptor()) { try inputRead?.safelyClose() } else { try inputRead?.markAsClosed() @@ -219,7 +213,7 @@ extension Configuration { possibleError = error } do { - if remainingSet.tryRemove(inputWrite?.descriptor) { + if remainingSet.tryRemove(inputWrite?.descriptor()) { try inputWrite?.safelyClose() } else { try inputWrite?.markAsClosed() @@ -228,7 +222,7 @@ extension Configuration { possibleError = error } do { - if remainingSet.tryRemove(outputRead?.descriptor) { + if remainingSet.tryRemove(outputRead?.descriptor()) { try outputRead?.safelyClose() } else { try outputRead?.markAsClosed() @@ -237,7 +231,7 @@ extension Configuration { possibleError = error } do { - if remainingSet.tryRemove(outputWrite?.descriptor) { + if remainingSet.tryRemove(outputWrite?.descriptor()) { try outputWrite?.safelyClose() } else { try outputWrite?.markAsClosed() @@ -246,7 +240,7 @@ extension Configuration { possibleError = error } do { - if remainingSet.tryRemove(errorRead?.descriptor) { + if remainingSet.tryRemove(errorRead?.descriptor()) { try errorRead?.safelyClose() } else { try errorRead?.markAsClosed() @@ -255,7 +249,7 @@ extension Configuration { possibleError = error } do { - if remainingSet.tryRemove(errorWrite?.descriptor) { + if remainingSet.tryRemove(errorWrite?.descriptor()) { try errorWrite?.safelyClose() } else { try errorWrite?.markAsClosed() @@ -660,15 +654,15 @@ extension Configuration { /// via `SpawnResult` to perform actual reads internal struct SpawnResult: ~Copyable { let execution: Execution - var _inputWriteEnd: IOChannel? - var _outputReadEnd: IOChannel? - var _errorReadEnd: IOChannel? + var _inputWriteEnd: IODescriptor? + var _outputReadEnd: IODescriptor? + var _errorReadEnd: IODescriptor? init( execution: Execution, - inputWriteEnd: consuming IOChannel?, - outputReadEnd: consuming IOChannel?, - errorReadEnd: consuming IOChannel? + inputWriteEnd: consuming IODescriptor?, + outputReadEnd: consuming IODescriptor?, + errorReadEnd: consuming IODescriptor? ) { self.execution = execution self._inputWriteEnd = consume inputWriteEnd @@ -676,15 +670,15 @@ extension Configuration { self._errorReadEnd = consume errorReadEnd } - mutating func inputWriteEnd() -> IOChannel? { + mutating func inputWriteEnd() -> IODescriptor? { return self._inputWriteEnd.take() } - mutating func outputReadEnd() -> IOChannel? { + mutating func outputReadEnd() -> IODescriptor? { return self._outputReadEnd.take() } - mutating func errorReadEnd() -> IOChannel? { + mutating func errorReadEnd() -> IODescriptor? { return self._errorReadEnd.take() } } @@ -760,7 +754,6 @@ internal enum _CloseTarget { #else case fileDescriptor(FileDescriptor) #endif - case dispatchIO(DispatchIO) } internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) { @@ -822,8 +815,6 @@ internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) { ) } #endif - case .dispatchIO(let dispatchIO): - dispatchIO.close() } } @@ -843,16 +834,16 @@ internal struct IODescriptor: ~Copyable { internal var closeWhenDone: Bool #if canImport(WinSDK) - internal nonisolated(unsafe) let descriptor: Descriptor + internal nonisolated(unsafe) let _descriptor: Descriptor #else - internal let descriptor: Descriptor + internal let _descriptor: Descriptor #endif internal init( _ descriptor: Descriptor, closeWhenDone: Bool ) { - self.descriptor = descriptor + self._descriptor = descriptor self.closeWhenDone = closeWhenDone } @@ -867,38 +858,15 @@ internal struct IODescriptor: ~Copyable { func duplicate() throws(SubprocessError) -> IODescriptor { do throws(any Error) { - return try IODescriptor(self.descriptor.duplicate(), closeWhenDone: self.closeWhenDone) + return try IODescriptor(self._descriptor.duplicate(), closeWhenDone: self.closeWhenDone) } catch { throw SubprocessError.asyncIOFailed( - reason: "Failed to duplicate file descriptor \(self.descriptor)", + reason: "Failed to duplicate file descriptor \(self._descriptor)", underlyingError: error as? SubprocessError.UnderlyingError ) } } - consuming func createIOChannel() -> IOChannel { - let shouldClose = self.closeWhenDone - self.closeWhenDone = false - #if SUBPROCESS_ASYNCIO_DISPATCH - // Transferring out the ownership of fileDescriptor means we don't have go close here - let closeFd = self.descriptor - let dispatchIO: DispatchIO = DispatchIO( - type: .stream, - fileDescriptor: self.platformDescriptor(), - queue: .global(), - cleanupHandler: { @Sendable error in - // Close the file descriptor - if shouldClose { - try? closeFd.close() - } - } - ) - return IOChannel(dispatchIO, closeWhenDone: shouldClose) - #else - return IOChannel(self.descriptor, closeWhenDone: shouldClose) - #endif - } - internal mutating func safelyClose() throws(SubprocessError) { guard self.closeWhenDone else { return @@ -906,9 +874,9 @@ internal struct IODescriptor: ~Copyable { closeWhenDone = false #if canImport(WinSDK) - try _safelyClose(.handle(self.descriptor)) + try _safelyClose(.handle(self._descriptor)) #else - try _safelyClose(.fileDescriptor(self.descriptor)) + try _safelyClose(.fileDescriptor(self._descriptor)) #endif } @@ -921,64 +889,31 @@ internal struct IODescriptor: ~Copyable { return } - fatalError("FileDescriptor \(self.descriptor) was not closed") + fatalError("FileDescriptor \(self._descriptor) was not closed") } internal func platformDescriptor() -> PlatformFileDescriptor { #if canImport(WinSDK) - return self.descriptor + return self._descriptor #else - return self.descriptor.platformDescriptor + return self._descriptor.platformDescriptor #endif } -} -internal struct IOChannel: ~Copyable, @unchecked Sendable { - #if SUBPROCESS_ASYNCIO_DISPATCH - typealias Channel = DispatchIO - #elseif canImport(WinSDK) - typealias Channel = HANDLE - #else - typealias Channel = FileDescriptor - #endif - - internal var closeWhenDone: Bool - internal let channel: Channel - - internal init( - _ channel: Channel, - closeWhenDone: Bool - ) { - self.channel = channel - self.closeWhenDone = closeWhenDone + internal func descriptor() -> Descriptor { + return self._descriptor } - internal mutating func safelyClose() throws(SubprocessError) { - guard self.closeWhenDone else { - return - } - closeWhenDone = false - - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.channel)) - #elseif canImport(WinSDK) - try _safelyClose(.handle(self.channel)) - #else - try _safelyClose(.fileDescriptor(self.channel)) - #endif - } - - @_optimize(none) - internal consuming func consumeIOChannel() -> Channel { - let result = self.channel - // Transfer the ownership out and therefor - // don't perform close on deinit + internal mutating func consumeDescriptor() -> Descriptor { + // This Descriptor has been consumed, we don't need to close it anymore self.closeWhenDone = false - return result + return self._descriptor } } #if canImport(WinSDK) +import Synchronization + internal enum PipeNameCounter { private static let value = Atomic(0) @@ -1069,8 +1004,9 @@ internal struct CreatedPipe: ~Copyable { openMode, DWORD(PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS), 1, // Max instance, - DWORD(readBufferSize), - DWORD(readBufferSize), + // Both libuv (Node) and Rust std::process use this value + DWORD(64 * 1024), + DWORD(64 * 1024), 0, nil ) diff --git a/Sources/Subprocess/Error.swift b/Sources/Subprocess/Error.swift index 7de1108e..264b84be 100644 --- a/Sources/Subprocess/Error.swift +++ b/Sources/Subprocess/Error.swift @@ -21,11 +21,20 @@ import Musl @preconcurrency public import WinSDK #endif +#if os(Windows) +// Windows does not use Errno in public type +#if canImport(System) +import System +#else +import SystemPackage +#endif +#else #if canImport(System) public import System #else public import SystemPackage #endif +#endif /// An error thrown by a subprocess operation. /// diff --git a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift deleted file mode 100644 index b8a438bd..00000000 --- a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift +++ /dev/null @@ -1,186 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift.org open source project -// -// Copyright (c) 2025 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -/// Darwin AsyncIO implementation based on DispatchIO - -// MARK: - macOS (DispatchIO) -#if SUBPROCESS_ASYNCIO_DISPATCH - -#if canImport(System) -import System -#else -import SystemPackage -#endif - -internal import Dispatch - -final class AsyncIO: Sendable { - static let shared: AsyncIO = AsyncIO() - - internal init() {} - - internal func shutdown() { /* noop on Darwin */ } - - internal func read( - from diskIO: borrowing IOChannel, - upTo maxLength: Int - ) async throws(SubprocessError) -> DispatchData? { - return try await self.read( - from: diskIO.channel, - upTo: maxLength, - ) - } - - internal func read( - from dispatchIO: DispatchIO, - upTo maxLength: Int - ) async throws(SubprocessError) -> DispatchData? { - // https://github.com/swiftlang/swift/issues/87810 - return try await _castError { - return try await withCheckedThrowingContinuation { continuation in - var buffer: DispatchData = .empty - dispatchIO.read( - offset: 0, - length: maxLength, - queue: DispatchQueue(label: "SubprocessReadQueue") - ) { done, data, error in - if error != 0 { - continuation.resume( - throwing: - SubprocessError - .failedToReadFromProcess( - withUnderlyingError: Errno(rawValue: error) - ) - ) - return - } - if let data { - if buffer.isEmpty { - buffer = data - } else { - buffer.append(data) - } - } - if done { - if !buffer.isEmpty { - continuation.resume(returning: buffer) - } else { - continuation.resume(returning: nil) - } - } - } - } - } - } - - internal func write( - _ span: borrowing RawSpan, - to diskIO: borrowing IOChannel - ) async throws(SubprocessError) -> Int { - // https://github.com/swiftlang/swift/issues/87810 - return try await _castError { - return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - span.withUnsafeBytes { - let dispatchData = DispatchData( - bytesNoCopy: $0, - deallocator: .custom( - nil, - { - // noop - } - ) - ) - - self.write(dispatchData, to: diskIO) { writtenLength, error in - if let error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - } - } - - internal func write( - _ array: [UInt8], - to diskIO: borrowing IOChannel - ) async throws(SubprocessError) -> Int { - // https://github.com/swiftlang/swift/issues/87810 - return try await _castError { - return try await withCheckedThrowingContinuation { continuation in - array.withUnsafeBytes { - let dispatchData = DispatchData( - bytesNoCopy: $0, - deallocator: .custom( - nil, - { - // noop - } - ) - ) - - self.write(dispatchData, to: diskIO) { writtenLength, error in - if let error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - } - } - - internal func write( - _ dispatchData: DispatchData, - to diskIO: borrowing IOChannel, - queue: DispatchQueue = .global(), - completion: @escaping (Int, SubprocessError?) -> Void - ) { - diskIO.channel.write( - offset: 0, - data: dispatchData, - queue: queue - ) { done, unwritten, error in - guard done else { - // Wait until we are done writing or encountered some error - return - } - - let unwrittenLength = unwritten?.count ?? 0 - let writtenLength = dispatchData.count - unwrittenLength - guard error != 0 else { - completion(writtenLength, nil) - return - } - completion( - writtenLength, - .failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error)) - ) - } - } -} - -#if canImport(Darwin) -// Dispatch has a -user-module-version of 54 in the macOS 15.3 SDK -#if canImport(Dispatch, _version: "54") -// DispatchData is annotated as Sendable -#else -// Retroactively conform DispatchData to Sendable -extension DispatchData: @retroactive @unchecked Sendable {} -#endif // canImport(Dispatch, _version: "54") -#else -extension DispatchData: @retroactive @unchecked Sendable {} -#endif // canImport(Darwin) - -#endif // SUBPROCESS_ASYNCIO_DISPATCH diff --git a/Sources/Subprocess/IO/AsyncIO+KQueue.swift b/Sources/Subprocess/IO/AsyncIO+KQueue.swift new file mode 100644 index 00000000..e0f889f2 --- /dev/null +++ b/Sources/Subprocess/IO/AsyncIO+KQueue.swift @@ -0,0 +1,396 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// AsyncIO implementation based on kqueue + +#if SUBPROCESS_ASYNCIO_KQUEUE + +#if canImport(System) +import System +#else +import SystemPackage +#endif + +#if canImport(Darwin) +import Darwin +#elseif canImport(Glibc) +import Glibc +#endif + +#if canImport(os) +import os +#endif + +import _SubprocessCShims +import Synchronization + +#if canImport(Darwin) +private typealias _Mutex = OSAllocatedUnfairLock +#else +private typealias _Mutex = Synchronization.Mutex +#endif + +// The kevent() C function and the kevent struct share the same name. +// Swift can disambiguate when given an explicit function type. +private let _kevent: + @convention(c) ( + Int32, + UnsafePointer?, + Int32, + UnsafeMutablePointer?, + Int32, + UnsafePointer? + ) -> Int32 = kevent + +private let _kqueueEventSize = 256 +private let _registration: + _Mutex< + [PlatformFileDescriptor: SignalStream.Continuation] + > = _Mutex([:]) + +private func _makeKevent( + ident: UInt, + filter: Int16, + flags: UInt16 +) -> kevent { + #if canImport(Darwin) + return kevent( + ident: ident, + filter: filter, + flags: flags, + fflags: 0, + data: 0, + udata: nil + ) + #else + return kevent( + ident: ident, + filter: filter, + flags: flags, + fflags: 0, + data: 0, + udata: nil, + ext: (0, 0, 0, 0) + ) + #endif +} + +final class AsyncIO: Sendable { + + typealias OutputStream = AsyncThrowingStream + + private struct MonitorThreadContext: Sendable { + let kqueueFileDescriptor: CInt + let shutdownReadFileDescriptor: CInt + } + + private struct State: Sendable { + let kqueueFileDescriptor: CInt + let shutdownReadFileDescriptor: CInt + let shutdownWriteFileDescriptor: CInt + nonisolated(unsafe) let monitorThread: pthread_t + } + + static let shared: AsyncIO = AsyncIO() + + private let state: Result + #if canImport(Darwin) + private let shutdownFlag: OSAllocatedUnfairLock = OSAllocatedUnfairLock(initialState: false) + #else + private let shutdownFlag: Atomic = Atomic(0) + #endif + + internal init() { + #if os(FreeBSD) || os(OpenBSD) + let kqueueFileDescriptor = kqueue1(O_CLOEXEC) + #else + let kqueueFileDescriptor = kqueue() + #endif + guard kqueueFileDescriptor >= 0 else { + let error: SubprocessError = .asyncIOFailed( + reason: "kqueue failed", + underlyingError: Errno(rawValue: errno) + ) + self.state = .failure(error) + return + } + let shutdownPipe: (readEnd: FileDescriptor, writeEnd: FileDescriptor) + do { + shutdownPipe = try FileDescriptor.pipe() + } catch { + let error: SubprocessError = .asyncIOFailed( + reason: "pipe failed for shutdown signaling", + underlyingError: error as? Errno + ) + self.state = .failure(error) + return + } + let shutdownReadFd = shutdownPipe.readEnd.rawValue + let shutdownWriteFd = shutdownPipe.writeEnd.rawValue + + var shutdownEvent = _makeKevent( + ident: UInt(shutdownReadFd), + filter: Int16(EVFILT_READ), + flags: UInt16(EV_ADD | EV_ENABLE) + ) + let rc = _kevent( + kqueueFileDescriptor, + &shutdownEvent, + 1, + nil, + 0, + nil + ) + guard rc == 0 else { + let error: SubprocessError = .asyncIOFailed( + reason: "failed to add shutdown fd to kqueue", + underlyingError: Errno(rawValue: errno) + ) + self.state = .failure(error) + return + } + + let context = MonitorThreadContext( + kqueueFileDescriptor: kqueueFileDescriptor, + shutdownReadFileDescriptor: shutdownReadFd + ) + let thread: pthread_t + do throws(Errno) { + thread = try pthread_create { + func reportError(_ error: SubprocessError) { + _registration.withLock { store in + for continuation in store.values { + continuation.finish(throwing: error) + } + } + } + + var events: [kevent] = Array( + repeating: _makeKevent(ident: 0, filter: 0, flags: 0), + count: _kqueueEventSize + ) + + monitorLoop: while true { + let eventCount = _kevent( + context.kqueueFileDescriptor, + nil, + 0, + &events, + Int32(events.count), + nil + ) + if eventCount < 0 { + if errno == EINTR || errno == EAGAIN { + continue + } + let error: SubprocessError = .asyncIOFailed( + reason: "kevent wait failed", + underlyingError: Errno(rawValue: errno) + ) + reportError(error) + break monitorLoop + } + + for index in 0.. SignalStream.Continuation? in + if let continuation = store[targetFileDescriptor] { + return continuation + } + return nil + } + continuation?.yield(true) + } + } + } + } catch let errno { + let error: SubprocessError = .asyncIOFailed( + reason: "Failed to create monitor thread", + underlyingError: errno + ) + self.state = .failure(error) + return + } + + let state = State( + kqueueFileDescriptor: kqueueFileDescriptor, + shutdownReadFileDescriptor: shutdownReadFd, + shutdownWriteFileDescriptor: shutdownWriteFd, + monitorThread: thread + ) + self.state = .success(state) + + atexit { + AsyncIO.shared.shutdown() + } + } + + internal func shutdown() { + guard case .success(let currentState) = self.state else { + return + } + + #if canImport(Darwin) + let alreadyShutdown = self.shutdownFlag.withLock { flag -> Bool in + if flag { return true } + flag = true + return false + } + guard !alreadyShutdown else { return } + #else + guard self.shutdownFlag.add(1, ordering: .sequentiallyConsistent).newValue == 1 else { + return + } + #endif + var one: UInt8 = 1 + let kqueueFd = FileDescriptor(rawValue: currentState.kqueueFileDescriptor) + let shutdownWriteFd = FileDescriptor(rawValue: currentState.shutdownWriteFileDescriptor) + let shutdownReadFd = FileDescriptor(rawValue: currentState.shutdownReadFileDescriptor) + withUnsafeBytes(of: &one) { ptr in + _ = try? shutdownWriteFd.write(ptr) + } + + pthread_join(currentState.monitorThread, nil) + var closeError: Errno? = nil + do { + try kqueueFd.close() + } catch { + closeError = error as? Errno + } + do { + try shutdownReadFd.close() + } catch { + closeError = error as? Errno + } + do { + try shutdownWriteFd.close() + } catch { + closeError = error as? Errno + } + + if let closeError { + fatalError("Failed to close kqueue fds: \(closeError)") + } + } + + internal func registerFileDescriptor( + _ fileDescriptor: FileDescriptor, + for event: Event + ) -> SignalStream { + return SignalStream { (continuation: SignalStream.Continuation) -> () in + switch self.state { + case .success(let state): + if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) { + continuation.finish(throwing: nonBlockingFdError) + return + } + let filter: Int16 + switch event { + case .read: + filter = Int16(EVFILT_READ) + case .write: + filter = Int16(EVFILT_WRITE) + } + + _registration.withLock { storage in + storage[fileDescriptor.rawValue] = continuation + } + + var kev = _makeKevent( + ident: UInt(fileDescriptor.rawValue), + filter: filter, + flags: UInt16(EV_ADD | EV_ENABLE) + ) + let rc = _kevent( + state.kqueueFileDescriptor, + &kev, + 1, + nil, + 0, + nil + ) + if rc != 0 { + _registration.withLock { storage in + _ = storage.removeValue(forKey: fileDescriptor.rawValue) + } + let capturedError = errno + let error: SubprocessError = .asyncIOFailed( + reason: "failed to add \(fileDescriptor.rawValue) to kqueue", + underlyingError: Errno(rawValue: capturedError) + ) + continuation.finish(throwing: error) + return + } + case .failure(let setupError): + continuation.finish(throwing: setupError) + return + } + } + } + + internal func removeRegistration(for fileDescriptor: FileDescriptor) throws(SubprocessError) { + switch self.state { + case .success(let state): + let registration = _registration.withLock { store in + return store.removeValue(forKey: fileDescriptor.rawValue) + } + guard let registration else { + return + } + registration.finish() + var kev = _makeKevent( + ident: UInt(fileDescriptor.rawValue), + filter: Int16(EVFILT_READ), + flags: UInt16(EV_DELETE) + ) + _ = _kevent( + state.kqueueFileDescriptor, + &kev, + 1, + nil, + 0, + nil + ) + kev.filter = Int16(EVFILT_WRITE) + _ = _kevent( + state.kqueueFileDescriptor, + &kev, + 1, + nil, + 0, + nil + ) + case .failure(let setupFailure): + throw setupFailure + } + } +} + +#if canImport(Darwin) +extension OSAllocatedUnfairLock where State: Sendable { + fileprivate init(_ initialValue: State) { + self.init(initialState: initialValue) + } +} +#endif + +#endif // SUBPROCESS_ASYNCIO_KQUEUE diff --git a/Sources/Subprocess/IO/AsyncIO+Linux.swift b/Sources/Subprocess/IO/AsyncIO+Linux.swift index 9baf1a0f..6bf7d98c 100644 --- a/Sources/Subprocess/IO/AsyncIO+Linux.swift +++ b/Sources/Subprocess/IO/AsyncIO+Linux.swift @@ -31,7 +31,6 @@ import Musl import _SubprocessCShims import Synchronization -private typealias SignalStream = AsyncThrowingStream private let _epollEventSize = 256 private let _registration: Mutex< @@ -55,11 +54,6 @@ final class AsyncIO: Sendable { } } - private enum Event { - case read - case write - } - private struct State { let epollFileDescriptor: CInt let shutdownFileDescriptor: CInt @@ -119,7 +113,7 @@ final class AsyncIO: Sendable { shutdownFileDescriptor: shutdownFileDescriptor ) let thread: pthread_t - do { + do throws(Errno) { thread = try pthread_create { func reportError(_ error: SubprocessError) { _registration.withLock { store in @@ -162,7 +156,11 @@ final class AsyncIO: Sendable { // from the shutdownFD if targetFileDescriptor == context.shutdownFileDescriptor { var buf: UInt64 = 0 - _ = _subprocess_read(context.shutdownFileDescriptor, &buf, MemoryLayout.size) + withUnsafeMutableBytes(of: &buf) { ptr in + _ = try? FileDescriptor( + rawValue: context.shutdownFileDescriptor + ).read(into: ptr, retryOnInterrupt: true) + } break monitorLoop } @@ -209,22 +207,32 @@ final class AsyncIO: Sendable { } var one: UInt64 = 1 // Wake up the thread for shutdown - _ = _subprocess_write(currentState.shutdownFileDescriptor, &one, MemoryLayout.stride) + let shutdownFd = FileDescriptor(rawValue: currentState.shutdownFileDescriptor) + let epollFd = FileDescriptor(rawValue: currentState.epollFileDescriptor) + withUnsafeBytes(of: &one) { ptr in + _ = try? shutdownFd.write(ptr) + } // Cleanup the monitor thread pthread_join(currentState.monitorThread, nil) - var closeError: CInt = 0 - if _subprocess_close(currentState.epollFileDescriptor) != 0 { - closeError = errno + + var closeError: Errno? = nil + do { + try epollFd.close() + } catch { + closeError = error as? Errno } - if _subprocess_close(currentState.shutdownFileDescriptor) != 0 { - closeError = errno + do { + try shutdownFd.close() + } catch { + closeError = error as? Errno } - if closeError != 0 { - fatalError("Failed to close epollfd: \(String(cString: strerror(closeError)))") + + if let closeError { + fatalError("Failed to close epollfd: \(closeError)") } } - private func registerFileDescriptor( + internal func registerFileDescriptor( _ fileDescriptor: FileDescriptor, for event: Event ) -> SignalStream { @@ -233,21 +241,8 @@ final class AsyncIO: Sendable { switch self.state { case .success(let state): // Set file descriptor to be non blocking - let flags = fcntl(fileDescriptor.rawValue, F_GETFD) - guard flags != -1 else { - let error: SubprocessError = .asyncIOFailed( - reason: "failed to get flags for \(fileDescriptor.rawValue)", - underlyingError: Errno(rawValue: errno) - ) - continuation.finish(throwing: error) - return - } - guard fcntl(fileDescriptor.rawValue, F_SETFL, flags | O_NONBLOCK) != -1 else { - let error: SubprocessError = .asyncIOFailed( - reason: "failed to set \(fileDescriptor.rawValue) to be non-blocking", - underlyingError: Errno(rawValue: errno) - ) - continuation.finish(throwing: error) + if let nonBlockingFdError = self.setNonblocking(for: fileDescriptor) { + continuation.finish(throwing: nonBlockingFdError) return } // Register event @@ -294,9 +289,16 @@ final class AsyncIO: Sendable { } } - private func removeRegistration(for fileDescriptor: FileDescriptor) throws(SubprocessError) { + internal func removeRegistration(for fileDescriptor: FileDescriptor) throws(SubprocessError) { switch self.state { case .success(let state): + let registration = _registration.withLock { store in + return store.removeValue(forKey: fileDescriptor.rawValue) + } + guard let registration else { + return + } + registration.finish() let rc = epoll_ctl( state.epollFileDescriptor, EPOLL_CTL_DEL, @@ -309,255 +311,10 @@ final class AsyncIO: Sendable { underlyingError: Errno(rawValue: errno) ) } - _registration.withLock { store in - _ = store.removeValue(forKey: fileDescriptor.rawValue) - } case .failure(let setupFailure): throw setupFailure } } } -extension AsyncIO { - - protocol _ContiguousBytes { - var count: Int { get } - - func withUnsafeBytes( - _ body: (UnsafeRawBufferPointer) throws -> ResultType - ) rethrows -> ResultType - } - - func read( - from diskIO: borrowing IOChannel, - upTo maxLength: Int - ) async throws(SubprocessError) -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) - } - - func read( - from fileDescriptor: FileDescriptor, - upTo maxLength: Int - ) async throws(SubprocessError) -> [UInt8]? { - guard maxLength > 0 else { - return nil - } - // If we are reading until EOF, start with readBufferSize - // and gradually increase buffer size - let bufferLength = maxLength == .max ? readBufferSize : maxLength - - var resultBuffer: [UInt8] = Array( - repeating: 0, count: bufferLength - ) - var readLength: Int = 0 - let signalStream = self.registerFileDescriptor(fileDescriptor, for: .read) - - do { - /// Outer loop: every iteration signals we are ready to read more data - for try await _ in signalStream { - /// Inner loop: repeatedly call `.read()` and read more data until: - /// 1. We reached EOF (read length is 0), in which case return the result - /// 2. We read `maxLength` bytes, in which case return the result - /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In - /// this case we `break` out of the inner loop and wait `.read()` to be - /// ready by `await`ing the next signal in the outer loop. - while true { - let bytesRead = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in - // Get a pointer to the memory at the specified offset - let targetCount = bufferPointer.count - readLength - - let offsetAddress = bufferPointer.baseAddress!.advanced(by: readLength) - - // Read directly into the buffer at the offset - return _subprocess_read(fileDescriptor.rawValue, offsetAddress, targetCount) - } - let capturedErrno = errno - if bytesRead > 0 { - // Read some data - readLength += bytesRead - if maxLength == .max { - // Grow resultBuffer if needed - guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { - continue - } - resultBuffer.append( - contentsOf: Array(repeating: 0, count: resultBuffer.count) - ) - } else if readLength >= maxLength { - // When we reached maxLength, return! - try self.removeRegistration(for: fileDescriptor) - return resultBuffer - } - } else if bytesRead == 0 { - // We reached EOF. Return whatever's left - try self.removeRegistration(for: fileDescriptor) - guard readLength > 0 else { - return nil - } - resultBuffer.removeLast(resultBuffer.count - readLength) - return resultBuffer - } else { - if self.shouldWaitForNextSignal(with: capturedErrno) { - // No more data for now wait for the next signal - break - } else { - // Throw all other errors - try self.removeRegistration(for: fileDescriptor) - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: Errno(rawValue: capturedErrno) - ) - } - } - } - } - } catch { - // Reset error code to .failedToRead to match other platforms - guard let originalError = error as? SubprocessError else { - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: nil - ) - } - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: originalError.underlyingError - ) - } - resultBuffer.removeLast(resultBuffer.count - readLength) - return resultBuffer - } - - func write( - _ array: [UInt8], - to diskIO: borrowing IOChannel - ) async throws(SubprocessError) -> Int { - return try await self._write(array, to: diskIO) - } - - func _write( - _ bytes: Bytes, - to diskIO: borrowing IOChannel - ) async throws(SubprocessError) -> Int { - guard bytes.count > 0 else { - return 0 - } - let fileDescriptor = diskIO.channel - let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write) - var writtenLength: Int = 0 - do { - /// Outer loop: every iteration signals we are ready to read more data - for try await _ in signalStream { - /// Inner loop: repeatedly call `.write()` and write more data until: - /// 1. We've written bytes.count bytes. - /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In - /// this case we `break` out of the inner loop and wait `.write()` to be - /// ready by `await`ing the next signal in the outer loop. - while true { - let written = bytes.withUnsafeBytes { ptr in - let remainingLength = ptr.count - writtenLength - let startPtr = ptr.baseAddress!.advanced(by: writtenLength) - return _subprocess_write(fileDescriptor.rawValue, startPtr, remainingLength) - } - let capturedErrno = errno - if written > 0 { - writtenLength += written - if writtenLength >= bytes.count { - // Wrote all data - try self.removeRegistration(for: fileDescriptor) - return writtenLength - } - } else { - if self.shouldWaitForNextSignal(with: capturedErrno) { - // No more data for now wait for the next signal - break - } else { - // Throw all other errors - try self.removeRegistration(for: fileDescriptor) - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: Errno(rawValue: capturedErrno) - ) - } - } - } - } - } catch { - // Reset error code to .failedToWrite to match other platforms - guard let originalError = error as? SubprocessError else { - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: error as? SubprocessError.UnderlyingError - ) - } - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: originalError.underlyingError - ) - } - return 0 - } - - func write( - _ span: borrowing RawSpan, - to diskIO: borrowing IOChannel - ) async throws(SubprocessError) -> Int { - guard span.byteCount > 0 else { - return 0 - } - let fileDescriptor = diskIO.channel - let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write) - var writtenLength: Int = 0 - do { - /// Outer loop: every iteration signals we are ready to read more data - for try await _ in signalStream { - /// Inner loop: repeatedly call `.write()` and write more data until: - /// 1. We've written bytes.count bytes. - /// 3. `.write()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In - /// this case we `break` out of the inner loop and wait `.write()` to be - /// ready by `await`ing the next signal in the outer loop. - while true { - let written = span.withUnsafeBytes { ptr in - let remainingLength = ptr.count - writtenLength - let startPtr = ptr.baseAddress!.advanced(by: writtenLength) - return _subprocess_write(fileDescriptor.rawValue, startPtr, remainingLength) - } - let capturedErrno = errno - if written > 0 { - writtenLength += written - if writtenLength >= span.byteCount { - // Wrote all data - try self.removeRegistration(for: fileDescriptor) - return writtenLength - } - } else { - if self.shouldWaitForNextSignal(with: capturedErrno) { - // No more data for now wait for the next signal - break - } else { - // Throw all other errors - try self.removeRegistration(for: fileDescriptor) - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: Errno(rawValue: capturedErrno) - ) - } - } - } - } - } catch { - // Reset error code to .failedToWrite to match other platforms - guard let originalError = error as? SubprocessError else { - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: error as? SubprocessError.UnderlyingError - ) - } - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: originalError.underlyingError - ) - } - return 0 - } - - @inline(__always) - private func shouldWaitForNextSignal(with error: CInt) -> Bool { - return error == EAGAIN || error == EWOULDBLOCK || error == EINTR - } -} - -extension Array: AsyncIO._ContiguousBytes where Element == UInt8 {} - #endif // canImport(Glibc) || canImport(Android) || canImport(Musl) diff --git a/Sources/Subprocess/IO/AsyncIO+Unix.swift b/Sources/Subprocess/IO/AsyncIO+Unix.swift new file mode 100644 index 00000000..565aa077 --- /dev/null +++ b/Sources/Subprocess/IO/AsyncIO+Unix.swift @@ -0,0 +1,283 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if os(Linux) || os(Android) || SUBPROCESS_ASYNCIO_KQUEUE + +#if canImport(System) +import System +#else +import SystemPackage +#endif + +#if canImport(Darwin) +import Darwin + +#elseif canImport(Glibc) +import Glibc + +// https://github.com/torvalds/linux/blob/master/include/uapi/linux/fcntl.h +// Complex macro that can't be directly imported in Swift +private let F_GETPIPE_SZ: CInt = 1032 +#elseif canImport(Android) +import Android +import posix_filesystem.sys_epoll + +// https://github.com/torvalds/linux/blob/master/include/uapi/linux/fcntl.h +// Complex macro that can't be directly imported in Swift +private let F_GETPIPE_SZ: CInt = 1032 +#elseif canImport(Musl) +import Musl +#endif + +import _SubprocessCShims + +internal typealias SignalStream = AsyncThrowingStream + +extension AsyncIO { + internal enum Event { + case read + case write + } + + protocol _ContiguousBytes { + var count: Int { get } + + func withUnsafeBytes( + _ body: (UnsafeRawBufferPointer) throws -> ResultType + ) rethrows -> ResultType + } + + internal func setNonblocking(for fileDescriptor: FileDescriptor) -> SubprocessError? { + let flags = fcntl(fileDescriptor.rawValue, F_GETFL) + guard flags != -1 else { + let error: SubprocessError = .asyncIOFailed( + reason: "failed to get flags for \(fileDescriptor.rawValue)", + underlyingError: Errno(rawValue: errno) + ) + return error + } + guard fcntl(fileDescriptor.rawValue, F_SETFL, flags | O_NONBLOCK) != -1 else { + let error: SubprocessError = .asyncIOFailed( + reason: "failed to set \(fileDescriptor.rawValue) to be non-blocking", + underlyingError: Errno(rawValue: errno) + ) + return error + } + return nil + } + + func read( + from diskIO: borrowing IODescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + return try await self.read(from: diskIO.descriptor(), upTo: maxLength) + } + + func read( + from fileDescriptor: FileDescriptor, + upTo maxLength: Int + ) async throws(SubprocessError) -> [UInt8]? { + guard maxLength > 0 else { + return nil + } + let bufferLength: Int + if maxLength == .max { + // Prevent OOM allocation + bufferLength = Self.queryPipeBufferSize(for: fileDescriptor) + } else { + bufferLength = maxLength + } + + var resultBuffer: [UInt8] = Array( + repeating: 0, count: bufferLength + ) + let signalStream = self.registerFileDescriptor(fileDescriptor, for: .read) + + do { + /// Outer loop: every iteration signals we are ready to read more data + for try await _ in signalStream { + /// Inner loop: repeatedly call `.read()` and read more data until: + /// 1. We reached EOF (read length is 0), in which case return the result + /// 2. We read `maxLength` bytes, in which case return the result + /// 3. `read()` returns -1 and sets `errno` to `EAGAIN` or `EWOULDBLOCK`. In + /// this case we `break` out of the inner loop and wait `.read()` to be + /// ready by `await`ing the next signal in the outer loop. + while true { + let bytesRead: Int + do { + bytesRead = try resultBuffer.withUnsafeMutableBytes { bufferPointer in + // Read directly into the buffer at the offset + return try fileDescriptor.read( + into: bufferPointer, + retryOnInterrupt: true + ) + } + } catch { + // FileDescriptor.read only throws Errno + let _errno = error as! Errno + + if self.shouldWaitForNextSignal(with: _errno.rawValue) { + // No more data for now wait for the next signal + break + } else { + // Throw all other errors + try self.removeRegistration(for: fileDescriptor) + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: _errno + ) + } + } + + if bytesRead > 0 { + // Read some data + // Return immediately so the caller can + // process it without waiting for the buffer to fill. + try self.removeRegistration(for: fileDescriptor) + resultBuffer.removeLast(resultBuffer.count - bytesRead) + return resultBuffer + } else if bytesRead == 0 { + // We reached EOF. + try self.removeRegistration(for: fileDescriptor) + return nil + } else { + // Should not happen + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: nil + ) + } + } + } + } catch { + try self.removeRegistration(for: fileDescriptor) + // Reset error code to .failedToRead to match other platforms + guard let originalError = error as? SubprocessError else { + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: nil + ) + } + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: originalError.underlyingError + ) + } + return nil + } + + func write( + _ array: [UInt8], + to diskIO: borrowing IODescriptor + ) async throws(SubprocessError) -> Int { + return try await self.write(array._bytes, to: diskIO) + } + + func write( + _ span: borrowing RawSpan, + to diskIO: borrowing IODescriptor + ) async throws(SubprocessError) -> Int { + guard span.byteCount > 0 else { + return 0 + } + let fileDescriptor = diskIO.descriptor() + let signalStream = self.registerFileDescriptor(fileDescriptor, for: .write) + var writtenLength: Int = 0 + do { + /// Outer loop: every iteration signals we are ready to read more data + for try await _ in signalStream { + /// Inner loop: repeatedly call `.write()` and write more data until: + /// 1. We've written bytes.count bytes. + /// 2. `FileDescriptor.write()` throwws `Errno` with `EAGAIN` or `EWOULDBLOCK`. In + /// this case we `break` out of the inner loop and wait `.write()` to be + /// ready by `await`ing the next signal in the outer loop. + while true { + let written: Int + do { + written = try span.extracting( + last: span.byteCount - writtenLength + ).withUnsafeBytes { + return try fileDescriptor.write( + $0, + retryOnInterrupt: true + ) + } + } catch { + // FileDescriptor.write only throws Errno + let _errno = error as! Errno + + if self.shouldWaitForNextSignal(with: _errno.rawValue) { + // No more data for now wait for the next signal + break + } else { + // Throw all other errors + try self.removeRegistration(for: fileDescriptor) + throw SubprocessError.failedToWriteToProcess( + withUnderlyingError: _errno + ) + } + } + + writtenLength += written + if writtenLength >= span.byteCount { + // Wrote all data + try self.removeRegistration(for: fileDescriptor) + return writtenLength + } + } + } + } catch { + // Reset error code to .failedToWrite to match other platforms + guard let originalError = error as? SubprocessError else { + throw SubprocessError.failedToWriteToProcess( + withUnderlyingError: error as? SubprocessError.UnderlyingError + ) + } + throw SubprocessError.failedToWriteToProcess( + withUnderlyingError: originalError.underlyingError + ) + } + return 0 + } + + @inline(__always) + private func shouldWaitForNextSignal(with error: CInt) -> Bool { + return error == EAGAIN || error == EWOULDBLOCK + } + + static func queryPipeBufferSize(for fileDescriptor: IODescriptor.Descriptor) -> Int { + #if os(Linux) || os(Android) + + // Works on glibc, musl, and Bionic since kernel 2.6.35 + let sz = fcntl(fileDescriptor.rawValue, F_GETPIPE_SZ) + // Fall back to page size + return sz > 0 ? Int(sz) : systemPageSize + #elseif canImport(Darwin) || os(OpenBSD) + // XNU and OpenBSD both set st_blksize to the pipe's current capacity + // in pipe_stat(). Undocumented on Darwin but stable; verify with a unit test. + var st = stat() + guard + fstat( + fileDescriptor.rawValue, &st + ) == 0, st.st_blksize > 0 + else { + // Fall back to page size + return systemPageSize + } + return Int(st.st_blksize) + #else + // FreeBSD does not have `st.st_blksize` equivelent. + // pipe_stat() hardcodes st.st_blksize = PAGE_SIZE + // Use 64kb like other platforms + return 64 * 1024 + #endif + } +} + +extension Array: AsyncIO._ContiguousBytes where Element == UInt8 {} + +#endif diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index b3cfbce3..adc6babd 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -243,10 +243,10 @@ final class AsyncIO: @unchecked Sendable { } func read( - from diskIO: borrowing IOChannel, + from diskIO: borrowing IODescriptor, upTo maxLength: Int ) async throws(SubprocessError) -> [UInt8]? { - return try await self.read(from: diskIO.channel, upTo: maxLength) + return try await self.read(from: diskIO.descriptor(), upTo: maxLength) } func read( @@ -256,115 +256,97 @@ final class AsyncIO: @unchecked Sendable { guard maxLength > 0 else { return nil } - // If we are reading until EOF, start with readBufferSize - // and gradually increase buffer size - let bufferLength = maxLength == .max ? readBufferSize : maxLength + let bufferLength: Int + if maxLength == .max { + // Prevent OOM allocation + bufferLength = Self.queryPipeBufferSize(for: handle) + } else { + bufferLength = maxLength + } var resultBuffer: [UInt8] = Array( repeating: 0, count: bufferLength ) - var readLength: Int = 0 var signalStream = self.registerHandle(handle).makeAsyncIterator() - while true { - // We use an empty `_OVERLAPPED()` here because `ReadFile` below - // only reads non-seekable files, aka pipes. - var overlapped = _OVERLAPPED() - let succeed = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in - // Get a pointer to the memory at the specified offset - // Windows ReadFile uses DWORD for target count, which means we can only - // read up to DWORD (aka UInt32) max. - let targetCount: DWORD = self.calculateRemainingCount( - totalCount: bufferPointer.count, - readCount: readLength - ) - - let offsetAddress = bufferPointer.baseAddress!.advanced(by: readLength) - // Read directly into the buffer at the offset - return ReadFile( - handle, - offsetAddress, - targetCount, - nil, - &overlapped - ) - } + // We use an empty `_OVERLAPPED()` here because `ReadFile` below + // only reads non-seekable files, aka pipes. + var overlapped = _OVERLAPPED() + let succeed = resultBuffer.withUnsafeMutableBufferPointer { bufferPointer in + // Get a pointer to the memory at the specified offset + // Windows ReadFile uses DWORD for target count, which means we can only + // read up to DWORD (aka UInt32) max. + let targetCount: DWORD = self.calculateRemainingCount( + totalCount: bufferPointer.count, + readCount: 0 + ) - if !succeed { - // It is expected `ReadFile` to return `false` in async mode. - // Make sure we only get `ERROR_IO_PENDING` or `ERROR_BROKEN_PIPE` - let lastError = GetLastError() - if lastError == ERROR_BROKEN_PIPE { - // We reached EOF. Return whatever's left - guard readLength > 0 else { - return nil - } - resultBuffer.removeLast(resultBuffer.count - readLength) - return resultBuffer - } - guard lastError == ERROR_IO_PENDING else { - let error: SubprocessError = .failedToReadFromProcess( - withUnderlyingError: SubprocessError.WindowsError(rawValue: lastError) - ) - throw error - } + // Read directly into the buffer at the offset + return ReadFile( + handle, + bufferPointer.baseAddress!, + targetCount, + nil, + &overlapped + ) + } + if !succeed { + // It is expected `ReadFile` to return `false` in async mode. + // Make sure we only get `ERROR_IO_PENDING` or `ERROR_BROKEN_PIPE` + let lastError = GetLastError() + if lastError == ERROR_BROKEN_PIPE { + // We reached EOF before any data was read + return nil } - // Now wait for read to finish - let bytesRead: DWORD - do { - bytesRead = try await signalStream.next() ?? 0 - } catch { - if let subprocessError = error as? SubprocessError { - throw subprocessError - } - throw SubprocessError.failedToReadFromProcess( - withUnderlyingError: error as? SubprocessError.UnderlyingError + guard lastError == ERROR_IO_PENDING else { + let error: SubprocessError = .failedToReadFromProcess( + withUnderlyingError: SubprocessError.WindowsError(rawValue: lastError) ) + throw error } - if bytesRead == 0 { - // We reached EOF. Return whatever's left - guard readLength > 0 else { - return nil - } - resultBuffer.removeLast(resultBuffer.count - readLength) - return resultBuffer - } else { - // Read some data - readLength += Int(truncatingIfNeeded: bytesRead) - if maxLength == .max { - // Grow resultBuffer if needed - guard Double(readLength) > 0.8 * Double(resultBuffer.count) else { - continue - } - resultBuffer.append( - contentsOf: Array(repeating: 0, count: resultBuffer.count) - ) - } else if readLength >= maxLength { - // When we reached maxLength, return! - return resultBuffer - } + } + // Now wait for read to finish + let bytesRead: DWORD + do { + bytesRead = try await signalStream.next() ?? 0 + } catch { + if let subprocessError = error as? SubprocessError { + throw subprocessError } + throw SubprocessError.failedToReadFromProcess( + withUnderlyingError: error as? SubprocessError.UnderlyingError + ) } + + if bytesRead == 0 { + // EOF + return nil + } + + // Got data — return immediately so the caller can process it + // without waiting for the buffer to fill. + resultBuffer.removeLast(resultBuffer.count - Int(truncatingIfNeeded: bytesRead)) + return resultBuffer } func write( _ array: [UInt8], - to diskIO: borrowing IOChannel + to diskIO: borrowing IODescriptor ) async throws(SubprocessError) -> Int { - return try await self._write(array, to: diskIO) + return try await self.write(array._bytes, to: diskIO) } func write( _ span: borrowing RawSpan, - to diskIO: borrowing IOChannel + to diskIO: borrowing IODescriptor ) async throws(SubprocessError) -> Int { guard span.byteCount > 0 else { return 0 } - let handle = diskIO.channel - var signalStream = self.registerHandle(diskIO.channel).makeAsyncIterator() + let handle = diskIO.descriptor() + var signalStream = self.registerHandle(handle).makeAsyncIterator() var writtenLength: Int = 0 while true { // We use an empty `_OVERLAPPED()` here because `WriteFile` below @@ -420,67 +402,6 @@ final class AsyncIO: @unchecked Sendable { } } - func _write( - _ bytes: Bytes, - to diskIO: borrowing IOChannel - ) async throws(SubprocessError) -> Int { - guard bytes.count > 0 else { - return 0 - } - let handle = diskIO.channel - var signalStream = self.registerHandle(diskIO.channel).makeAsyncIterator() - var writtenLength: Int = 0 - while true { - // We use an empty `_OVERLAPPED()` here because `WriteFile` below - // only writes to non-seekable files, aka pipes. - var overlapped = _OVERLAPPED() - let succeed = bytes.withUnsafeBytes { ptr in - // Windows WriteFile uses DWORD for target count - // which means we can only write up to DWORD max - let remainingLength: DWORD = self.calculateRemainingCount( - totalCount: ptr.count, - readCount: writtenLength - ) - let startPtr = ptr.baseAddress!.advanced(by: writtenLength) - return WriteFile( - handle, - startPtr, - remainingLength, - nil, - &overlapped - ) - } - - if !succeed { - // It is expected `WriteFile` to return `false` in async mode. - // Make sure we only get `ERROR_IO_PENDING` - let lastError = GetLastError() - guard lastError == ERROR_IO_PENDING else { - let error: SubprocessError = .failedToWriteToProcess( - withUnderlyingError: SubprocessError.WindowsError(rawValue: lastError) - ) - throw error - } - } - // Now wait for write to finish - let bytesWritten: DWORD - do { - bytesWritten = try await signalStream.next() ?? 0 - } catch { - if let subprocessError = error as? SubprocessError { - throw subprocessError - } - throw SubprocessError.failedToWriteToProcess( - withUnderlyingError: error as? SubprocessError.UnderlyingError - ) - } - writtenLength += Int(truncatingIfNeeded: bytesWritten) - if writtenLength >= bytes.count { - return writtenLength - } - } - } - // Windows ReadFile uses DWORD for target count, which means we can only // read up to DWORD (aka UInt32) max. private func calculateRemainingCount(totalCount: Int, readCount: Int) -> DWORD { @@ -493,6 +414,12 @@ final class AsyncIO: @unchecked Sendable { return DWORD(truncatingIfNeeded: min(totalCount - readCount, Int(DWORD.max))) } } + + static func queryPipeBufferSize(for fileDescriptor: IODescriptor.Descriptor) -> Int { + // Windows does not provide an API to query pipe buffer size. + // Node `getDefaultHighWaterMark` uses 16k + return 16 * 1024 + } } extension Array: AsyncIO._ContiguousBytes where Element == UInt8 {} diff --git a/Sources/Subprocess/IO/Input.swift b/Sources/Subprocess/IO/Input.swift index b616d6ea..9d249940 100644 --- a/Sources/Subprocess/IO/Input.swift +++ b/Sources/Subprocess/IO/Input.swift @@ -237,9 +237,9 @@ extension InputProtocol { /// A writer that sends data to the standard input of a subprocess. public final actor StandardInputWriter: Sendable { - internal var diskIO: IOChannel + internal var diskIO: IODescriptor - init(diskIO: consuming IOChannel) { + init(diskIO: consuming IODescriptor) { self.diskIO = diskIO } diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index b0a14495..602bc3bd 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -19,8 +19,6 @@ public import SystemPackage @preconcurrency import WinSDK #endif -internal import Dispatch - // MARK: - Output /// A type that serves as the output target for a subprocess. @@ -127,35 +125,42 @@ public struct BytesOutput: OutputProtocol, ErrorOutputProtocol { public let maxSize: Int internal func captureOutput( - from diskIO: consuming IOChannel + from diskIO: consuming IODescriptor ) async throws(SubprocessError) -> [UInt8] { - #if SUBPROCESS_ASYNCIO_DISPATCH - var result: DispatchData? = nil - #else - var result: [UInt8]? = nil - #endif + var result: [UInt8] = [] do { var maxLength = self.maxSize if maxLength != .max { // If we actually have a max length, attempt to read one // more byte to determine whether output exceeds the limit maxLength += 1 + // We can also reserve capacity + result.reserveCapacity(maxLength) + } + let bufferSize = AsyncIO.queryPipeBufferSize(for: diskIO.descriptor()) + + while result.count < maxLength { + let remaining = maxLength - result.count + guard + let chunk = try await AsyncIO.shared.read( + from: diskIO, + upTo: min(bufferSize, remaining) + ) + else { + break + } + result.append(contentsOf: chunk) } - result = try await AsyncIO.shared.read(from: diskIO, upTo: maxLength) } catch { try diskIO.safelyClose() throw error } try diskIO.safelyClose() - if let result, result.count > self.maxSize { + if result.count > self.maxSize { throw .outputLimitExceeded(limit: self.maxSize) } - #if SUBPROCESS_ASYNCIO_DISPATCH - return result?.array() ?? [] - #else - return result ?? [] - #endif + return result } /// Creates an array from a ``RawSpan``. @@ -317,7 +322,7 @@ extension OutputProtocol { /// Capture the output from the subprocess up to maxSize @_disfavoredOverload internal func captureOutput( - from diskIO: consuming IOChannel? + from diskIO: consuming IODescriptor? ) async throws -> OutputType { if OutputType.self == Void.self { try diskIO?.safelyClose() @@ -337,39 +342,45 @@ extension OutputProtocol { return try await bytesOutput.captureOutput(from: diskIO) as! Self.OutputType } - #if SUBPROCESS_ASYNCIO_DISPATCH - var result: DispatchData? = nil - #else - var result: [UInt8]? = nil - #endif + var result: [UInt8] = [] do { var maxLength = self.maxSize if maxLength != .max { // If we actually have a max length, attempt to read one // more byte to determine whether output exceeds the limit maxLength += 1 + result.reserveCapacity(maxLength) + } + let bufferSize = AsyncIO.queryPipeBufferSize(for: diskIO.descriptor()) + + while result.count < maxLength { + let remaining = maxLength - result.count + guard + let chunk = try await AsyncIO.shared.read( + from: diskIO, + upTo: min(bufferSize, remaining) + ) + else { + break + } + result.append(contentsOf: chunk) } - result = try await AsyncIO.shared.read(from: diskIO, upTo: maxLength) } catch { try diskIO.safelyClose() throw error } try diskIO.safelyClose() - if let result, result.count > self.maxSize { + if result.count > self.maxSize { throw SubprocessError.outputLimitExceeded(limit: self.maxSize) } - #if SUBPROCESS_ASYNCIO_DISPATCH - return try self.output(from: result ?? .empty) - #else - return try self.output(from: result ?? []) - #endif + return try self.output(from: result) } } extension OutputProtocol where OutputType == Void { - internal func captureOutput(from fileDescriptor: consuming IOChannel?) async throws {} + internal func captureOutput(from fileDescriptor: consuming IODescriptor?) async throws {} /// Converts the output from a raw span to the expected output type. public func output(from span: RawSpan) throws { @@ -379,21 +390,6 @@ extension OutputProtocol where OutputType == Void { } extension OutputProtocol { - #if SUBPROCESS_ASYNCIO_DISPATCH - internal func output(from data: DispatchData) throws -> OutputType { - guard !data.isEmpty else { - let empty = UnsafeRawBufferPointer(start: nil, count: 0) - let span = RawSpan(_unsafeBytes: empty) - return try self.output(from: span) - } - - return try data.withUnsafeBytes { ptr in - let bufferPtr = UnsafeRawBufferPointer(start: ptr, count: data.count) - let span = RawSpan(_unsafeBytes: bufferPtr) - return try self.output(from: span) - } - } - #else internal func output(from data: [UInt8]) throws -> OutputType { guard !data.isEmpty else { let empty = UnsafeRawBufferPointer(start: nil, count: 0) @@ -406,22 +402,6 @@ extension OutputProtocol { return try self.output(from: span) } } - #endif // SUBPROCESS_ASYNCIO_DISPATCH -} - -extension DispatchData { - internal func array() -> [UInt8] { - var result: [UInt8]? - self.enumerateBytes { buffer, byteIndex, stop in - let currentChunk = Array(UnsafeRawBufferPointer(buffer)) - if result == nil { - result = currentChunk - } else { - result?.append(contentsOf: currentChunk) - } - } - return result ?? [] - } } extension FileDescriptor { diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index ce832dff..0f52ae84 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -478,9 +478,9 @@ extension Configuration { ) return SpawnResult( execution: execution, - inputWriteEnd: inputWriteFileDescriptor?.createIOChannel(), - outputReadEnd: outputReadFileDescriptor?.createIOChannel(), - errorReadEnd: errorReadFileDescriptor?.createIOChannel() + inputWriteEnd: inputWriteFileDescriptor, + outputReadEnd: outputReadFileDescriptor, + errorReadEnd: errorReadFileDescriptor ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 403c0461..6024b7ba 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -9,23 +9,6 @@ // //===----------------------------------------------------------------------===// -#if canImport(Glibc) -import Glibc -let _subprocess_read = Glibc.read -let _subprocess_write = Glibc.write -let _subprocess_close = Glibc.close -#elseif canImport(Android) -import Android -let _subprocess_read = Android.read -let _subprocess_write = Android.write -let _subprocess_close = Android.close -#elseif canImport(Musl) -import Musl -let _subprocess_read = Musl.read -let _subprocess_write = Musl.write -let _subprocess_close = Musl.close -#endif - #if os(Linux) || os(Android) #if canImport(System) @@ -34,6 +17,14 @@ import System import SystemPackage #endif +#if canImport(Glibc) +import Glibc +#elseif canImport(Android) +import Android +#elseif canImport(Musl) +import Musl +#endif + internal import Dispatch import Synchronization @@ -198,7 +189,10 @@ private func shutdown() { var one: UInt64 = 1 // Wake up the thread for shutdown - _ = _subprocess_write(storage.shutdownFileDescriptor, &one, MemoryLayout.size) + withUnsafeBytes(of: &one) { ptr in + _ = try? FileDescriptor(rawValue: storage.shutdownFileDescriptor) + .write(ptr, retryOnInterrupt: true) + } // Cleanup the monitor thread pthread_join(storage.monitorThread, nil) } @@ -213,7 +207,10 @@ private func signalHandler( ) { let savedErrno = errno var one: UInt8 = 1 - _ = _subprocess_write(_signalPipe.writeEnd, &one, 1) + withUnsafeBytes(of: &one) { ptr in + _ = try? FileDescriptor(rawValue: _signalPipe.writeEnd) + .write(ptr, retryOnInterrupt: true) + } errno = savedErrno } @@ -268,7 +265,11 @@ private func monitorThreadFunc(context: MonitorThreadContext) { // from the shutdownFD if targetFileDescriptor == context.shutdownFileDescriptor { var buf: UInt64 = 0 - _ = _subprocess_read(context.shutdownFileDescriptor, &buf, MemoryLayout.size) + withUnsafeMutableBytes(of: &buf) { ptr in + _ = try? FileDescriptor( + rawValue: context.shutdownFileDescriptor + ).read(into: ptr, retryOnInterrupt: true) + } break monitorLoop } @@ -456,7 +457,11 @@ private func _reapAllKnownChildProcesses(_ signalFd: CInt, context: MonitorThrea // Drain the signalFd var buffer: UInt8 = 0 - while _subprocess_read(signalFd, &buffer, 1) > 0 { /* noop, drain the pipe */ } + withUnsafeMutableBytes(of: &buffer) { ptr in + while (try? FileDescriptor(rawValue: signalFd).read(into: ptr) > 0) ?? false { + /* noop, drain the pipe */ + } + } let resumingContinuations: [ResultContinuation] = _processMonitorState.withLock { state in guard case .started(let storage) = state else { diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 6655ff9d..5228e882 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -559,9 +559,9 @@ extension Configuration { ) return SpawnResult( execution: execution, - inputWriteEnd: inputWriteFileDescriptor?.createIOChannel(), - outputReadEnd: outputReadFileDescriptor?.createIOChannel(), - errorReadEnd: errorReadFileDescriptor?.createIOChannel() + inputWriteEnd: inputWriteFileDescriptor, + outputReadEnd: outputReadFileDescriptor, + errorReadEnd: errorReadFileDescriptor ) } @@ -615,7 +615,7 @@ public struct ProcessIdentifier: Sendable, Hashable { internal func close() { if self.processDescriptor > 0 { - _ = _subprocess_close(self.processDescriptor) + try? FileDescriptor(rawValue: self.processDescriptor).close() } } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 7d07fd6f..02fc77df 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -220,9 +220,9 @@ extension Configuration { return SpawnResult( execution: execution, - inputWriteEnd: inputWriteFileDescriptor?.createIOChannel(), - outputReadEnd: outputReadFileDescriptor?.createIOChannel(), - errorReadEnd: errorReadFileDescriptor?.createIOChannel() + inputWriteEnd: inputWriteFileDescriptor, + outputReadEnd: outputReadFileDescriptor, + errorReadEnd: errorReadFileDescriptor ) } @@ -430,9 +430,9 @@ extension Configuration { return SpawnResult( execution: execution, - inputWriteEnd: inputWriteFileDescriptor?.createIOChannel(), - outputReadEnd: outputReadFileDescriptor?.createIOChannel(), - errorReadEnd: errorReadFileDescriptor?.createIOChannel() + inputWriteEnd: inputWriteFileDescriptor, + outputReadEnd: outputReadFileDescriptor, + errorReadEnd: errorReadFileDescriptor ) } @@ -1533,7 +1533,11 @@ internal func fillNullTerminatedWideStringBuffer( return result } } catch { + #if swift(>=6.3) + throw error + #else throw error as! SubprocessError.WindowsError + #endif } } throw SubprocessError.WindowsError(rawValue: DWORD(ERROR_INSUFFICIENT_BUFFER)) diff --git a/Sources/Subprocess/Span+Subprocess.swift b/Sources/Subprocess/Span+Subprocess.swift index 0de1c5fa..cc27f2c8 100644 --- a/Sources/Subprocess/Span+Subprocess.swift +++ b/Sources/Subprocess/Span+Subprocess.swift @@ -48,26 +48,16 @@ extension Span where Element: BitwiseCopyable { } } -#if canImport(Glibc) || canImport(Bionic) || canImport(Musl) -internal import Dispatch - -extension DispatchData { - var bytes: RawSpan { - _read { - if self.count == 0 { - let empty = UnsafeRawBufferPointer(start: nil, count: 0) - let span = RawSpan(_unsafeBytes: empty) - yield _overrideLifetime(of: span, to: self) - } else { - // FIXME: We cannot get a stable ptr out of DispatchData. - // For now revert back to copy - let array = Array(self) - let ptr = array.withUnsafeBytes { return $0 } - let span = RawSpan(_unsafeBytes: ptr) - yield _overrideLifetime(of: span, to: self) - } +extension Array where Element: BitwiseCopyable { + // swift-format-ignore + // Access the storage backing this Buffer + internal var _bytes: RawSpan { + @_lifetime(borrow self) + borrowing get { + let ptr = self.withUnsafeBytes { $0 } + let bytes = RawSpan(_unsafeBytes: ptr) + return _overrideLifetime(of: bytes, to: self) } } } -#endif // canImport(Glibc) || canImport(Bionic) || canImport(Musl) diff --git a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift index 6b25482b..b6dae6e7 100644 --- a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift +++ b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift @@ -149,47 +149,15 @@ extension StandardInputWriter { } } -#if SUBPROCESS_ASYNCIO_DISPATCH -extension AsyncIO { - internal func write( - _ data: Data, - to diskIO: borrowing IOChannel - ) async throws(SubprocessError) -> Int { - try await _castError { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let dispatchData = data.withUnsafeBytes { - return DispatchData( - bytesNoCopy: $0, - deallocator: .custom( - nil, - { - // noop - } - ) - ) - } - self.write(dispatchData, to: diskIO) { writtenLength, error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: writtenLength) - } - } - } - } - } -} -#else extension Data: AsyncIO._ContiguousBytes {} extension AsyncIO { internal func write( _ data: Data, - to diskIO: borrowing IOChannel + to diskIO: borrowing IODescriptor ) async throws(SubprocessError) -> Int { - return try await self._write(data, to: diskIO) + return try await self.write(data.bytes, to: diskIO) } } -#endif // SUBPROCESS_ASYNCIO_DISPATCH #endif // SubprocessFoundation diff --git a/Sources/_SubprocessCShims/include/process_shims.h b/Sources/_SubprocessCShims/include/process_shims.h index c87b9b87..9f03a635 100644 --- a/Sources/_SubprocessCShims/include/process_shims.h +++ b/Sources/_SubprocessCShims/include/process_shims.h @@ -46,7 +46,11 @@ int _subprocess_pthread_create( #else pthread_t * _Nonnull ptr, #endif - pthread_attr_t const * _Nullable attr, +#if defined(__FreeBSD__) || defined(__OpenBSD__) + const pthread_attr_t _Nullable * _Nullable attr, +#else + const pthread_attr_t * _Nullable attr, +#endif void * _Nullable (* _Nonnull start)(void * _Nullable), void * _Nullable context ); diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index 0626df12..edbda831 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -46,12 +46,12 @@ extension SubprocessAsyncIOTests { @Test func testBasicReadWrite() async throws { let testData = randomData(count: 1024) try await runReadWriteTest { readIO, readTestBed in - let readData = try #require( - try await readIO.read(from: readTestBed.ioChannel, upTo: .max) + let readData = try await readUntilEOF( + from: readTestBed.ioDescriptor, with: readIO ) #expect(Array(readData) == testData) } writer: { writeIO, writeTestBed in - _ = try await writeIO.write(testData, to: writeTestBed.ioChannel) + _ = try await writeIO.write(testData, to: writeTestBed.ioDescriptor) try await writeTestBed.finish() } } @@ -64,24 +64,32 @@ extension SubprocessAsyncIOTests { } for _ in 0..<10 { // Generate some that are longer than buffer size - _chunks.append(randomData(count: Int.random(in: Subprocess.readBufferSize.. [UInt8] { + var result: [UInt8] = [] + let bufferSize = AsyncIO.queryPipeBufferSize( + for: ioDescriptor.descriptor() + ) + while let chunk = try await asyncIO.read(from: ioDescriptor, upTo: bufferSize) { + result.append(contentsOf: chunk) + } + return result + } + + func read( + until readLength: Int, + from ioDescriptor: borrowing IODescriptor, + with asyncIO: AsyncIO + ) async throws -> [UInt8] { + guard readLength < .max else { + fatalError("Use readUntilEOF instead") + } + var result: [UInt8] = [] + result.reserveCapacity(readLength) + let bufferSize = AsyncIO.queryPipeBufferSize( + for: ioDescriptor.descriptor() + ) + while result.count < readLength { + let targetLength = min(bufferSize, readLength - result.count) + guard + let chunk = try await asyncIO.read( + from: ioDescriptor, upTo: targetLength + ) + else { + break + } + result.append(contentsOf: chunk) + } + return result + } } extension SubprocessAsyncIOTests.TestBed { consuming func finish() async throws { - #if SUBPROCESS_ASYNCIO_DISPATCH - try _safelyClose(.dispatchIO(self.ioChannel.channel)) - #elseif canImport(WinSDK) - try _safelyClose(.handle(self.ioChannel.channel)) + #if canImport(WinSDK) + try _safelyClose(.handle(self.ioDescriptor.descriptor())) #else - try _safelyClose(.fileDescriptor(self.ioChannel.channel)) + try _safelyClose(.fileDescriptor(self.ioDescriptor.descriptor())) #endif } diff --git a/Tests/SubprocessTests/IntegrationTests.swift b/Tests/SubprocessTests/IntegrationTests.swift index 9b729f4e..810b7b05 100644 --- a/Tests/SubprocessTests/IntegrationTests.swift +++ b/Tests/SubprocessTests/IntegrationTests.swift @@ -1864,7 +1864,6 @@ extension SubprocessIntegrationTests { setup, input: .none, error: .discarded, - preferredBufferSize: 1 ) { execution, standardOutput in for try await line in standardOutput.strings() { // If we use default buffer size this test will hang @@ -2154,9 +2153,9 @@ extension SubprocessIntegrationTests { let length: Int switch size { case .large: - length = Int(Double.random(in: 1.0..<2.0) * Double(readBufferSize)) + 1 + length = Int(Double.random(in: 1.0..<2.0) * Double(systemPageSize)) + 1 case .medium: - length = Int(Double.random(in: 0.2..<1.0) * Double(readBufferSize)) + 1 + length = Int(Double.random(in: 0.2..<1.0) * Double(systemPageSize)) + 1 case .small: length = Int.random(in: 1..<16) } @@ -2989,7 +2988,6 @@ func _run< _ setup: TestSetup, input: Input, error: Error, - preferredBufferSize: Int? = nil, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Error.OutputType == Void { return try await Subprocess.run( @@ -2999,7 +2997,6 @@ func _run< workingDirectory: setup.workingDirectory, input: input, error: error, - preferredBufferSize: preferredBufferSize, body: body ) } diff --git a/Tests/SubprocessTests/UnixTests.swift b/Tests/SubprocessTests/UnixTests.swift index f09d24c3..076c1c7a 100644 --- a/Tests/SubprocessTests/UnixTests.swift +++ b/Tests/SubprocessTests/UnixTests.swift @@ -328,8 +328,7 @@ extension SubprocessUnixTests { """, ], platformOptions: platformOptions, - error: .fileDescriptor(.standardError, closeAfterSpawningProcess: false), - preferredBufferSize: 1 + error: .fileDescriptor(.standardError, closeAfterSpawningProcess: false) ) { execution, standardOutput in // Read stdout incrementally. Once we see the PID line, // we know the trap is set up and it's safe to send SIGINT. @@ -414,8 +413,7 @@ extension SubprocessUnixTests { """, ], platformOptions: platformOptions, - error: .fileDescriptor(.standardError, closeAfterSpawningProcess: false), - preferredBufferSize: 1 + error: .fileDescriptor(.standardError, closeAfterSpawningProcess: false) ) { _, standardOutput in var grandChildPid: pid_t? for try await line in standardOutput.strings() {