From 274ca48ef1e52fafadb1b96a00dd3085aa6af4df Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 13 May 2026 13:04:50 -0700 Subject: [PATCH] Collapse run() overloads behind a generic Execution type Reduce the 14+ run() overloads to two (one executable-based, one configuration-based) by making Execution generic over its input, output, and error types. The body closure now takes a single Execution, and type-conditional extensions expose standardInputWriter, standardOutput, and standardError only for the matching stream types. Companion changes: - Merge ExecutionOutcome into ExecutionResult, which now carries the body closure's return value via closureOutput. Now all run() methods return ExecutionResult. ExecutionOutcome remains as an internal helper. - Expose CustomWriteInput and SequenceOutput publicly, along with the .inputWriter and .sequence factories. Callers opt in per stream: input: .inputWriter unlocks execution.standardInputWriter; output: .sequence unlocks execution.standardOutput; error: .sequence unlocks execution.standardError. --- README.md | 123 +-- Sources/Subprocess/API.swift | 944 ++++-------------- Sources/Subprocess/AsyncBufferSequence.swift | 48 +- Sources/Subprocess/Configuration.swift | 32 +- Sources/Subprocess/Execution.swift | 71 +- Sources/Subprocess/IO/Input.swift | 32 +- Sources/Subprocess/IO/Output.swift | 26 +- .../Platforms/Subprocess+Darwin.swift | 5 +- .../Platforms/Subprocess+Unix.swift | 10 +- .../Platforms/Subprocess+Windows.swift | 10 +- Sources/Subprocess/Result.swift | 89 +- .../Input+Foundation.swift | 4 +- Tests/SubprocessTests/DarwinTests.swift | 5 +- Tests/SubprocessTests/IntegrationTests.swift | 397 +++++--- Tests/SubprocessTests/LinuxTests.swift | 6 +- .../ProcessMonitoringTests.swift | 14 +- Tests/SubprocessTests/UnixTests.swift | 24 +- Tests/SubprocessTests/WindowsTests.swift | 16 +- 18 files changed, 783 insertions(+), 1073 deletions(-) diff --git a/README.md b/README.md index df1da889..0b48379e 100644 --- a/README.md +++ b/README.md @@ -63,27 +63,37 @@ print(result.terminationStatus) // e.g. exited(0) print(result.standardOutput) // e.g. Optional("LICENSE\nPackage.swift\n...") ``` -This returns an `ExecutionRecord` containing the process identifier, termination status, and collected standard output and standard error. +This returns an `ExecutionResult` containing the process identifier, termination status, and collected standard output and standard error. ### Run with a Custom Closure -For more control, pass a closure that runs while the child process is active. The closure receives an `Execution` handle and, depending on the variant, streams for standard output, standard error, and a writer for standard input. +For more control, pass a closure that runs while the child process is active. The closure receives a single `Execution` value that you use to send signals, write to standard input, and stream standard output and standard error. > [!CAUTION] -> All closure arguments,`Execution`, `AsyncBufferSequence`, and `StandardInputWriter`, are valid only for the duration of the closure's execution and must not be escaped. +> The `Execution`, `AsyncBufferSequence`, and `StandardInputWriter` values are valid only for the duration of the closure. Don't let them escape the closure. +You opt into each interactive stream by choosing the matching input or output type: + +| To do this... | Pass this... | Then read from... | +| --- | --- | --- | +| Write to standard input from the closure | `input: .inputWriter` | `execution.standardInputWriter` | +| Stream standard output | `output: .sequence` | `execution.standardOutput` | +| Stream standard error | `error: .sequence` | `execution.standardError` | Stream standard output line by line: ```swift import Subprocess -let outcome = try await run( +let result = try await run( .path("/usr/bin/tail"), - arguments: ["-f", "/path/to/nginx.log"] -) { execution, outputSequence in - for try await line in outputSequence.lines() { + arguments: ["-f", "/path/to/nginx.log"], + input: .none, + output: .sequence, + error: .discarded +) { execution in + for try await line in execution.standardOutput.strings() { if line.contains("500") { // Oh no, 500 error } @@ -94,73 +104,65 @@ let outcome = try await run( Write to standard input and read from standard output: ```swift -let outcome = try await run(.name("cat")) { execution, inputWriter, outputSequence in - try await inputWriter.write("Hello, Subprocess!\n") - try await inputWriter.finish() - for try await line in outputSequence.lines() { - print(line) // "Hello, Subprocess!" - } +let result = try await run( + .name("cat"), + input: .inputWriter, + output: .sequence, + error: .discarded +) { execution in + async let reading: Void = { + for try await line in execution.standardOutput.strings() { + print(line) // "Hello, Subprocess!" + } + }() + + try await execution.standardInputWriter.write("Hello, Subprocess!\n") + try await execution.standardInputWriter.finish() + try await reading } ``` -The closure-based `run` returns an `ExecutionOutcome` containing both the closure's return value and the termination status. +The closure-based `run` returns an `ExecutionResult`. Access the closure's return value with `result.closureOutput`, and the termination status with `result.terminationStatus`. -`Subprocess` provides several closure variants depending on which streams you need: +Because `input`, `output`, and `error` are separate parameters, you can mix streaming and capturing in the same call. For example, stream standard output from the closure while collecting standard error as a string, and return the closure's own value through `closureOutput`: -* Manage the runnning process without streaming ```swift -run(.path("/my/app")) { execution in - ... -} -``` - -* Manage the running process and stream standard output or standard error -```swift -run(.path("/my/app"), error: .discarded) { execution, outputStream in - for try await item in outputStream { ... } +let result = try await run( + .path("/my/app"), + input: .none, + output: .sequence, + error: .string(limit: 4096) +) { execution in + var lineCount = 0 + for try await _ in execution.standardOutput.lines() { + lineCount += 1 + } + return lineCount } -run(.path("/my/app"), output: .discarded) { execution, errorStream in - for try await item in errorStream { ... } -} +print(result.closureOutput) // The line count returned from the closure. +print(result.standardError ?? "") // The captured standard error. ``` +Stream both standard output and standard error, writing to standard input from the same closure: -* Write to standard input and stream standard output or standard error ```swift -run(.path("/my/app"), output: .discarded) { execution, inputWriter, outputStream in +try await run( + .path("/my/app"), + input: .inputWriter, + output: .sequence, + error: .sequence +) { execution in try await withThrowingTaskGroup { group in - group.addTask { for try await item in outputStream { ... } } group.addTask { - _ = try await inputWriter.write("Hello Subprocess") - try await inputWriter.finish() + for try await line in execution.standardOutput.lines() { /* ... */ } } - try await group.waitForAll() - } -} - - -run(.path("/my/app"), error: .discarded) { execution, inputWriter, errorStream in - try await withThrowingTaskGroup { group in - group.addTask { for try await item in errorStream { ... } } group.addTask { - _ = try await inputWriter.write("Hello Subprocess") - try await inputWriter.finish() + for try await line in execution.standardError.lines() { /* ... */ } } - try await group.waitForAll() - } -} -``` - -* Write to standard input and stream both standard output and standard error -```swift -run(.path("/my/app")) { execution, inputWriter, outputStream, errorStream in - try await withThrowingTaskGroup { group in - group.addTask { for try await item in outputStream { ... } } - group.addTask { for try await item in errorStream { ... } } group.addTask { - _ = try await inputWriter.write("Hello Subprocess") - try await inputWriter.finish() + _ = try await execution.standardInputWriter.write("Hello Subprocess") + try await execution.standardInputWriter.finish() } try await group.waitForAll() } @@ -169,10 +171,10 @@ run(.path("/my/app")) { execution, inputWriter, outputStream, errorStream in In the closure-based API, output streams are delivered as an `AsyncBufferSequence` — an asynchronous sequence of `Buffer` values. Each `Buffer` provides access to its bytes via `withUnsafeBytes(_:)` or the `bytes` property (a `RawSpan`). -The preferred method to convert `Buffer` to `String` is to read output line by line using `.lines()`. You can optionally specify an encoding and buffering policy: +The preferred way to convert `Buffer` to `String` is to read output line by line using `.lines()`. You can optionally specify an encoding and buffering policy: ```swift -for try await line in outputSequence.lines( +for try await line in execution.standardOutput.lines( encoding: UTF16.self, bufferingPolicy: .maxLineLength(1024) ) { @@ -213,7 +215,6 @@ let result = try await run(config, output: .string(limit: 4096)) ``` -Use it by setting `.string(_:)` or `.string(_:using:)` for `input`. ### Input and Output Options By default, `Subprocess`: @@ -232,6 +233,7 @@ For the collected-result API, you must specify how to capture standard output. | `.string(_:)` or `.string(_:using:)` | Read from a string with optional encoding | | `.array(_:)` | Read from a `[UInt8]` array | | `Span` | Read from a span (passed directly as the `input` parameter) | +| `.inputWriter` | Write from the closure via `execution.standardInputWriter` (closure-based `run` only) | | `.data(_:)` | Read from `Data` (requires `SubprocessFoundation`) | | `.sequence(_:)` | Read from a `Sequence` or `AsyncSequence` (requires `SubprocessFoundation`) | @@ -245,6 +247,7 @@ For the collected-result API, you must specify how to capture standard output. | `.currentStandardOutput` or `.currentStandardError` | Write to the parent process's standard output or standard error | | `.string(limit:)` or `.string(limit:encoding:)` | Collect as `String?` | | `.bytes(limit:)` | Collect as `[UInt8]` | +| `.sequence` | Stream to the closure via `execution.standardOutput` or `execution.standardError` (closure-based `run` only) | | `.data(limit:)` | Collect as `Data` (requires `SubprocessFoundation`) | | `.combinedWithOutput` | Merge standard error into the standard output stream (error parameter only) | @@ -273,7 +276,7 @@ let serverTask = Task { .gracefulShutDown(allowedDurationToNextStep: .seconds(5)) ] - let outcome = try await run( + let result = try await run( .name("server"), platformOptions: platformOptions, output: .string(limit: 1024) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 74b8e261..a8382125 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -19,6 +19,7 @@ public import SystemPackage /// Runs an executable asynchronously and returns the collected output /// of the child process. +/// /// - Parameters: /// - executable: The executable to run. /// - arguments: The arguments to pass to the executable. @@ -28,7 +29,7 @@ public import SystemPackage /// - input: The input to send to the executable. /// - output: The method to use for redirecting standard output. /// - error: The method to use for redirecting standard error. -/// - Returns: An ``ExecutionRecord`` that contains the result of the run. +/// - Returns: An ``ExecutionResult`` that contains the result of the run. public func run< Input: InputProtocol, Output: OutputProtocol, @@ -42,7 +43,7 @@ public func run< input: Input = .none, output: Output, error: Error = .discarded -) async throws -> ExecutionRecord { +) async throws -> ExecutionResult { let configuration = Configuration( executable: executable, arguments: arguments, @@ -60,6 +61,7 @@ public func run< /// Runs an executable asynchronously and returns the collected output /// of the child process. +/// /// - Parameters: /// - executable: The executable to run. /// - arguments: The arguments to pass to the executable. @@ -69,7 +71,7 @@ public func run< /// - input: A span to write to the subprocess's standard input. /// - output: The method to use for redirecting standard output. /// - error: The method to use for redirecting standard error. -/// - Returns: An ``ExecutionRecord`` that contains the result of the run. +/// - Returns: An ``ExecutionResult`` that contains the result of the run. public func run< InputElement: BitwiseCopyable, Output: OutputProtocol, @@ -83,7 +85,7 @@ public func run< input: borrowing Span, output: Output, error: Error = .discarded -) async throws -> ExecutionRecord { +) async throws -> ExecutionResult { let configuration = Configuration( executable: executable, arguments: arguments, @@ -101,8 +103,15 @@ public func run< // MARK: - Custom Execution Body -/// Runs an executable with given parameters and a custom closure -/// to manage the running subprocess’s lifetime. +/// Runs an executable asynchronously and lets a closure manage the running subprocess. +/// +/// Use this overload when you need to interact with the subprocess while it runs, +/// such as streaming its standard output, writing to its standard input, or sending +/// signals. The closure runs concurrently with the subprocess and receives an +/// ``Execution`` value you can use to access these capabilities. +/// +/// The subprocess must terminate before this method returns. +/// /// - Parameters: /// - executable: The executable to run. /// - arguments: The arguments to pass to the executable. @@ -110,13 +119,13 @@ public func run< /// - workingDirectory: The working directory in which to run the executable. /// - platformOptions: The platform-specific options to use when running the executable. /// - input: The input to send to the executable. -/// - output: How to manage executable standard output. -/// - error: How to manage executable standard error. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. +/// - output: The method to use for redirecting standard output. +/// - error: The method to use for redirecting standard error. +/// - body: A closure that manages the running subprocess. The closure receives +/// an ``Execution`` value that's valid only for the duration of the call. +/// Don't let the execution value escape the closure. +/// - Returns: An ``ExecutionResult`` that contains the closure's return value and +/// the termination status of the child process. public func run< Result, Input: InputProtocol, @@ -128,100 +137,13 @@ public func run< environment: Environment = .inherit, workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), - input: Input = .none, - output: Output = .discarded, - error: Error = .discarded, - body: (_ execution: Execution) async throws -> Result -) async throws -> ExecutionOutcome where Error.OutputType == Void { - let configuration = Configuration( - executable: executable, - arguments: arguments, - environment: environment, - workingDirectory: workingDirectory, - platformOptions: platformOptions - ) - return try await run( - configuration, - input: input, - output: output, - error: error, - body: body - ) -} - -/// Runs an executable with given parameters and a custom closure to manage the -/// running subprocess's lifetime and stream its standard output. -/// - Parameters: -/// - executable: The executable to run. -/// - arguments: The arguments to pass to the executable. -/// - environment: The environment in which to run the executable. -/// - workingDirectory: The working directory in which to run the executable. -/// - platformOptions: The platform-specific options to use when running the executable. -/// - input: The input to send to the executable. -/// - error: How to manage executable standard error. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - outputSequence: The standard output as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ executable: Executable, - arguments: Arguments = [], - environment: Environment = .inherit, - workingDirectory: FilePath? = nil, - platformOptions: PlatformOptions = PlatformOptions(), - input: Input = .none, - error: Error = .discarded, - body: ( - _ execution: Execution, - _ outputSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome where Error.OutputType == Void { - let configuration = Configuration( - executable: executable, - arguments: arguments, - environment: environment, - workingDirectory: workingDirectory, - platformOptions: platformOptions - ) - return try await run( - configuration, - input: input, - error: error, - body: body - ) -} - -/// Runs an executable with given parameters and a custom closure to manage the -/// running subprocess's lifetime and stream its standard error. -/// - Parameters: -/// - executable: The executable to run. -/// - arguments: The arguments to pass to the executable. -/// - environment: The environment in which to run the executable. -/// - workingDirectory: The working directory in which to run the executable. -/// - platformOptions: The platform-specific options to use when running the executable. -/// - input: The input to send to the executable. -/// - output: How to manage executable standard output. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ executable: Executable, - arguments: Arguments = [], - environment: Environment = .inherit, - workingDirectory: FilePath? = nil, - platformOptions: PlatformOptions = PlatformOptions(), - input: Input = .none, + input: Input, output: Output, + error: Error, body: ( - _ execution: Execution, - _ errorSequence: AsyncBufferSequence + Execution ) async throws -> Result -) async throws -> ExecutionOutcome where Output.OutputType == Void { +) async throws -> ExecutionResult { let configuration = Configuration( executable: executable, arguments: arguments, @@ -233,199 +155,22 @@ public func run( configuration, input: input, output: output, - body: body - ) -} - -/// Runs an executable with given parameters and a custom closure to manage the -/// running subprocess's lifetime and stream its standard output and error. -/// - Parameters: -/// - executable: The executable to run. -/// - arguments: The arguments to pass to the executable. -/// - environment: The environment in which to run the executable. -/// - workingDirectory: The working directory in which to run the executable. -/// - platformOptions: The platform-specific options to use when running the executable. -/// - input: The input to send to the executable. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ executable: Executable, - arguments: Arguments = [], - environment: Environment = .inherit, - workingDirectory: FilePath? = nil, - platformOptions: PlatformOptions = PlatformOptions(), - input: Input = .none, - preferredBufferSize: Int? = nil, - body: ( - _ execution: Execution, - _ outputSequence: AsyncBufferSequence, - _ errorSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome { - let configuration = Configuration( - executable: executable, - arguments: arguments, - environment: environment, - workingDirectory: workingDirectory, - platformOptions: platformOptions - ) - return try await run( - configuration, - input: input, - preferredBufferSize: preferredBufferSize, - body: body - ) -} - -/// Runs an executable with given parameters and a custom closure to manage the -/// running subprocess's lifetime, write to its standard input, and stream its standard output. -/// - Parameters: -/// - executable: The executable to run. -/// - arguments: The arguments to pass to the executable. -/// - environment: The environment in which to run the executable. -/// - workingDirectory: The working directory in which to run the executable. -/// - platformOptions: The platform-specific options to use when running the executable. -/// - error: How to manage executable standard error. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - inputWriter: A writer for the subprocess's standard input. -/// - outputSequence: The standard output as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ executable: Executable, - arguments: Arguments = [], - environment: Environment = .inherit, - workingDirectory: FilePath? = nil, - platformOptions: PlatformOptions = PlatformOptions(), - error: Error = .discarded, - body: ( - _ execution: Execution, - _ inputWriter: StandardInputWriter, - _ outputSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome where Error.OutputType == Void { - let configuration = Configuration( - executable: executable, - arguments: arguments, - environment: environment, - workingDirectory: workingDirectory, - platformOptions: platformOptions - ) - return try await run( - configuration, error: error, body: body ) } -/// Runs an executable with given parameters and a custom closure to manage the -/// running subprocess's lifetime, write to its standard input, and stream its standard error. -/// - Parameters: -/// - executable: The executable to run. -/// - arguments: The arguments to pass to the executable. -/// - environment: The environment in which to run the executable. -/// - workingDirectory: The working directory in which to run the executable. -/// - platformOptions: The platform-specific options to use when running the executable. -/// - output: How to manage executable standard output. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - inputWriter: A writer for the subprocess's standard input. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ executable: Executable, - arguments: Arguments = [], - environment: Environment = .inherit, - workingDirectory: FilePath? = nil, - platformOptions: PlatformOptions = PlatformOptions(), - output: Output, - body: ( - _ execution: Execution, - _ inputWriter: StandardInputWriter, - _ errorSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome where Output.OutputType == Void { - let configuration = Configuration( - executable: executable, - arguments: arguments, - environment: environment, - workingDirectory: workingDirectory, - platformOptions: platformOptions - ) - return try await run( - configuration, - output: output, - body: body - ) -} - -/// Runs an executable with given parameters and a custom closure -/// to manage the running subprocess’s lifetime, write to its -/// standard input, and stream its standard output and standard error. -/// - Parameters: -/// - executable: The executable to run. -/// - arguments: The arguments to pass to the executable. -/// - environment: The environment in which to run the executable. -/// - workingDirectory: The working directory in which to run the executable. -/// - platformOptions: The platform-specific options to use when running the executable. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - inputWriter: A writer for the subprocess's standard input. -/// - outputSequence: The standard output as an asynchronous sequence of buffers. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ executable: Executable, - arguments: Arguments = [], - environment: Environment = .inherit, - workingDirectory: FilePath? = nil, - platformOptions: PlatformOptions = PlatformOptions(), - body: ( - _ execution: Execution, - _ inputWriter: StandardInputWriter, - _ outputSequence: AsyncBufferSequence, - _ errorSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome { - let configuration = Configuration( - executable: executable, - arguments: arguments, - environment: environment, - workingDirectory: workingDirectory, - platformOptions: platformOptions - ) - return try await run( - configuration, - body: body - ) -} - // MARK: - Configuration Based -/// Runs a configuration asynchronously and returns -/// an ``ExecutionRecord`` that contains the output of the child process. +/// Runs a configuration asynchronously and returns an ``ExecutionResult`` that +/// contains the output of the child process. +/// /// - Parameters: /// - configuration: The configuration to run. /// - input: A span to write to the subprocess's standard input. /// - output: The method to use for redirecting standard output. /// - error: The method to use for redirecting standard error. -/// - Returns: An ``ExecutionRecord`` that contains the result of the run. +/// - Returns: An ``ExecutionResult`` that contains the result of the run. public func run< InputElement: BitwiseCopyable, Output: OutputProtocol, @@ -435,57 +180,28 @@ public func run< input: borrowing Span, output: Output, error: Error = .discarded -) async throws -> ExecutionRecord { - typealias RunResult = ( - processIdentifier: ProcessIdentifier, - standardOutput: Output.OutputType, - standardError: Error.OutputType - ) - - let customInput = CustomWriteInput() - - let result = try await configuration.run( - input: try customInput.createPipe(), - output: try output.createPipe(), - error: try error.createPipe(), - ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IODescriptor? = consume inputIO - var outputIOBox: IODescriptor? = consume outputIO - var errorIOBox: IODescriptor? = consume errorIO - - // Write input, capture output and error in parallel - async let stdout = try output.captureOutput(from: outputIOBox.take()) - async let stderr = try error.captureOutput(from: errorIOBox.take()) - // Write span at the same isolation - if let writeFd = inputIOBox.take() { - let writer = StandardInputWriter(diskIO: writeFd) - _ = try await writer.write(input._bytes) - try await writer.finish() - } - - return ( - processIdentifier: execution.processIdentifier, - standardOutput: try await stdout, - standardError: try await stderr - ) +) async throws -> ExecutionResult { + let inputMethod = CustomWriteInput() + return try await run( + configuration, + input: inputMethod, + output: output, + error: error + ) { execution in + _ = try await execution.standardInputWriter.write(input._bytes) + try await execution.standardInputWriter.finish() } - - return ExecutionRecord( - processIdentifier: result.value.processIdentifier, - terminationStatus: result.terminationStatus, - standardOutput: result.value.standardOutput, - standardError: result.value.standardError - ) } /// Runs a ``Configuration`` asynchronously and returns -/// an ``ExecutionRecord`` that contains the output of the child process. +/// an ``ExecutionResult`` that contains the output of the child process. +/// /// - Parameters: /// - configuration: The configuration to run. /// - input: The input to send to the executable. /// - output: The method to use for redirecting standard output. /// - error: The method to use for redirecting standard error. -/// - Returns: An ``ExecutionRecord`` that contains the result of the run. +/// - Returns: An ``ExecutionResult`` that contains the result of the run. public func run< Input: InputProtocol, Output: OutputProtocol, @@ -495,104 +211,32 @@ public func run< input: Input = .none, output: Output, error: Error = .discarded -) async throws -> ExecutionRecord { - typealias RunResult = ( - processIdentifier: ProcessIdentifier, - standardOutput: Output.OutputType, - standardError: Error.OutputType - ) - let inputPipe = try input.createPipe() - let outputPipe = try output.createPipe() - let errorPipe = try error.createPipe(from: outputPipe) - - let result = try await configuration.run( - input: inputPipe, - output: outputPipe, - error: errorPipe - ) { execution, inputIO, outputIO, errorIO in - // Write input, capture output and error in parallel - var inputIOBox: IODescriptor? = consume inputIO - var outputIOBox: IODescriptor? = consume outputIO - var errorIOBox: IODescriptor? = consume errorIO - return try await withThrowingTaskGroup( - of: OutputCapturingState?.self, - returning: RunResult.self - ) { group in - var inputIOContainer: IODescriptor? = inputIOBox.take() - var outputIOContainer: IODescriptor? = outputIOBox.take() - var errorIOContainer: IODescriptor? = errorIOBox.take() - group.addTask { - if let writeFd = inputIOContainer.take() { - let writer = StandardInputWriter(diskIO: writeFd) - try await input.write(with: writer) - try await writer.finish() - } - return nil - } - group.addTask { - let stdout = try await output.captureOutput( - from: outputIOContainer.take() - ) - return .standardOutputCaptured(stdout) - } - group.addTask { - let stderr = try await error.captureOutput( - from: errorIOContainer.take() - ) - return .standardErrorCaptured(stderr) - } - - do { - var stdout: Output.OutputType! - var stderror: Error.OutputType! - while let state = try await group.next() { - switch state { - case .standardOutputCaptured(let output): - stdout = output - case .standardErrorCaptured(let error): - stderror = error - case .none: - continue - } - } - - return ( - processIdentifier: execution.processIdentifier, - standardOutput: stdout, - standardError: stderror - ) - } catch { - if let underlying = error as? SubprocessError.UnderlyingError { - throw SubprocessError.asyncIOFailed( - reason: "Failed to capture output", - underlyingError: underlying - ) - } - throw error - } - } +) async throws -> ExecutionResult { + return try await run(configuration, input: input, output: output, error: error) { _ in + return () as Void } - - return ExecutionRecord( - processIdentifier: result.value.processIdentifier, - terminationStatus: result.terminationStatus, - standardOutput: result.value.standardOutput, - standardError: result.value.standardError - ) } -/// Runs an executable with a given ``Configuration`` and a custom closure -/// to manage the running subprocess's lifetime. +/// Runs a ``Configuration`` asynchronously and lets a closure manage the running +/// subprocess. +/// +/// Use this overload when you need to interact with the subprocess while it runs, +/// such as streaming its standard output, writing to its standard input, or sending +/// signals. The closure runs concurrently with the subprocess and receives an +/// ``Execution`` value you can use to access these capabilities. +/// +/// The subprocess must terminate before this method returns. +/// /// - Parameters: /// - configuration: The configuration to run. /// - input: The input to send to the executable. -/// - output: How to manage executable standard output. -/// - error: How to manage executable standard error. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. +/// - output: The method to use for redirecting standard output. +/// - error: The method to use for redirecting standard error. +/// - body: A closure that manages the running subprocess. The closure receives +/// an ``Execution`` value that's valid only for the duration of the call. +/// Don't let the execution value escape the closure. +/// - Returns: An ``ExecutionResult`` that contains the closure's return value and +/// the termination status of the child process. public func run< Result, Input: InputProtocol, @@ -600,375 +244,139 @@ public func run< Error: ErrorOutputProtocol >( _ configuration: Configuration, - input: Input = .none, - output: Output = .discarded, - error: Error = .discarded, - body: (_ execution: Execution) async throws -> Result -) async throws -> ExecutionOutcome where Error.OutputType == Void { - let inputPipe = try input.createPipe() - let outputPipe = try output.createPipe() - let errorPipe = try error.createPipe(from: outputPipe) - - return try await configuration.run( - input: inputPipe, - output: outputPipe, - error: errorPipe - ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IODescriptor? = consume inputIO - var outputIOBox: IODescriptor? = consume outputIO - var errorIOBox: IODescriptor? = consume errorIO - try outputIOBox?.safelyClose() - try errorIOBox?.safelyClose() - - return try await withThrowingTaskGroup( - of: Void.self, - returning: Result.self - ) { group in - var inputIOContainer: IODescriptor? = inputIOBox.take() - group.addTask { - if let inputIO = inputIOContainer.take() { - let writer = StandardInputWriter(diskIO: inputIO) - try await input.write(with: writer) - try await writer.finish() - } - } - - // Body runs in the same isolation - let result = try await body(execution) - - try await group.waitForAll() - return result - } - } -} - -/// Runs an executable with a given ``Configuration`` and a custom closure -/// to manage the running subprocess's lifetime and stream its standard output. -/// - Parameters: -/// - configuration: The configuration to run. -/// - input: The input to send to the executable. -/// - error: How to manage executable standard error. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - outputSequence: The standard output as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run< - Result, - Input: InputProtocol, - Error: ErrorOutputProtocol ->( - _ configuration: Configuration, - input: Input = .none, - error: Error = .discarded, + input: Input, + output: Output, + error: Error, body: ( - _ execution: Execution, - _ outputSequence: AsyncBufferSequence + Execution ) async throws -> Result -) async throws -> ExecutionOutcome where Error.OutputType == Void { - let output = SequenceOutput() - let inputPipe = try input.createPipe() +) async throws -> ExecutionResult { + typealias RunResult = ( + processIdentifier: ProcessIdentifier, + closureResult: Result, + output: Output.OutputType, + error: Error.OutputType + ) + let outputPipe = try output.createPipe() let errorPipe = try error.createPipe(from: outputPipe) - - return try await configuration.run( - input: inputPipe, + let result: ExecutionOutcome = try await configuration.run( + input: try input.createPipe(), + as: Input.self, output: outputPipe, - error: errorPipe - ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IODescriptor? = consume inputIO - var outputIOBox: IODescriptor? = consume outputIO - var errorIOBox: IODescriptor? = consume errorIO - try errorIOBox?.safelyClose() + as: Output.self, + error: errorPipe, + as: Error.self + ) { processIdentifier, inputIO, outputIO, errorIO in + var inputIOBox = consume inputIO + var outputIOBox = consume outputIO + var errorIOBox = consume errorIO - return try await withThrowingTaskGroup( - of: Void.self, - returning: Result.self - ) { group in - var inputIOContainer: IODescriptor? = inputIOBox.take() - group.addTask { - if let inputIO = inputIOContainer.take() { - let writer = StandardInputWriter(diskIO: inputIO) - try await input.write(with: writer) - try await writer.finish() + return try await withThrowingTaskGroup(of: _RunGroupResult.self) { group in + var writer: StandardInputWriter? + if inputIOBox != nil { + let inputWriter = StandardInputWriter(diskIO: inputIOBox.take()!) + writer = inputWriter + + if Input.self != CustomWriteInput.self { + // Write non-custom inputs in a parallel task. + group.addTask { + try await input.write(with: inputWriter) + try await inputWriter.finish() + return .inputWritten + } } } - // Body runs in the same isolation - let outputSequence = AsyncBufferSequence( - diskIO: outputIOBox!.consumeDescriptor() - ) - - let result = try await body(execution, outputSequence) - try await group.waitForAll() - return result - } - } -} - -/// Runs an executable with a given ``Configuration`` and a custom closure -/// to manage the running subprocess's lifetime and stream its standard error. -/// - Parameters: -/// - configuration: The configuration to run. -/// - input: The input to send to the executable. -/// - output: How to manage executable standard output. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ configuration: Configuration, - input: Input = .none, - output: Output, - body: ( - _ execution: Execution, - _ errorSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome where Output.OutputType == Void { - let error = SequenceOutput() - - return try await configuration.run( - input: try input.createPipe(), - output: try output.createPipe(), - error: try error.createPipe(), - ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IODescriptor? = consume inputIO - var outputIOBox: IODescriptor? = consume outputIO - var errorIOBox: IODescriptor? = consume errorIO - try outputIOBox?.safelyClose() - - return try await withThrowingTaskGroup( - of: Void.self, - returning: Result.self - ) { group in - var inputIOContainer: IODescriptor? = inputIOBox.take() - group.addTask { - if let inputIO = inputIOContainer.take() { - let writer = StandardInputWriter(diskIO: inputIO) - try await input.write(with: writer) - try await writer.finish() + var outputSequence: AsyncBufferSequence? = nil + var errorSequence: AsyncBufferSequence? = nil + // Capture output and error in parallel + if Output.self == SequenceOutput.self { + var diskIO = outputIOBox.take() + outputSequence = AsyncBufferSequence( + diskIO: diskIO!.consumeDescriptor() + ) + } else if Output.OutputType.self == Void.self { + // No need to capture output + var diskIO = outputIOBox.take() + try diskIO?.safelyClose() + } else { + var diskIO = outputIOBox.take() + group.addTask { + let result = try await output.captureOutput(from: diskIO.take()) + return .standardOutputCaptured(result) } } - let errorSequence = AsyncBufferSequence( - diskIO: errorIOBox!.consumeDescriptor() - ) - // Body runs in the same isolation - let result = try await body(execution, errorSequence) - try await group.waitForAll() - return result - } - } -} - -/// Runs an executable with a given ``Configuration`` and a custom closure -/// to manage the running subprocess's lifetime and stream its standard output and error. -/// - Parameters: -/// - configuration: The configuration to run. -/// - input: The input to send to the executable. -/// - output: How to manage executable standard output. -/// - preferredBufferSize: The preferred size in bytes for the buffer used when reading -/// from the subprocess's standard error stream. If `nil`, uses the system page size -/// as the default buffer size. Larger buffer sizes may improve performance for -/// subprocesses that produce large amounts of output, while smaller buffer sizes -/// may reduce memory usage and improve responsiveness for interactive applications. -/// - isolation: The isolation context to run the body closure. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - outputSequence: The standard output an asynchronous sequence of buffers. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ configuration: Configuration, - input: Input = .none, - preferredBufferSize: Int? = nil, - body: ( - _ execution: Execution, - _ outputSequence: AsyncBufferSequence, - _ errorSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome { - let output = SequenceOutput() - let error = SequenceOutput() - - return try await configuration.run( - input: try input.createPipe(), - output: try output.createPipe(), - error: try error.createPipe(), - ) { execution, inputIO, outputIO, errorIO in - var inputIOBox: IODescriptor? = consume inputIO - var outputIOBox: IODescriptor? = consume outputIO - var errorIOBox: IODescriptor? = consume errorIO - - return try await withThrowingTaskGroup( - of: Void.self, - returning: Result.self - ) { group in - var inputIOContainer: IODescriptor? = inputIOBox.take() - group.addTask { - if let inputIO = inputIOContainer.take() { - let writer = StandardInputWriter(diskIO: inputIO) - try await input.write(with: writer) - try await writer.finish() + if Error.self == SequenceOutput.self { + var diskIO = errorIOBox.take() + errorSequence = AsyncBufferSequence( + diskIO: diskIO!.consumeDescriptor() + ) + } else if Error.OutputType.self == Void.self { + // No need to capture error + var diskIO = errorIOBox.take() + try diskIO?.safelyClose() + } else { + var diskIO = errorIOBox.take() + group.addTask { + let result = try await error.captureOutput(from: diskIO.take()) + return .standardErrorCaptured(result) } } - // Body runs in the same isolation - let outputSequence = AsyncBufferSequence( - diskIO: outputIOBox!.consumeDescriptor() - ) - - let errorSequence = AsyncBufferSequence( - diskIO: errorIOBox!.consumeDescriptor() + let execution = Execution( + processIdentifier: processIdentifier, + inputWriter: writer, + outputStream: outputSequence, + errorStream: errorSequence ) + let result: Result + do { + result = try await body(execution) + } catch { + if Input.self == CustomWriteInput.self { + try await writer?.finish() + } + throw error + } + if Input.self == CustomWriteInput.self { + try await writer?.finish() + } - let result = try await body(execution, outputSequence, errorSequence) - try await group.waitForAll() - return result + var capturedOutput: Output.OutputType? + var capturedError: Error.OutputType? + while let groupResult = try await group.next() { + switch groupResult { + case .inputWritten: + continue + case .standardOutputCaptured(let output): + capturedOutput = output + case .standardErrorCaptured(let error): + capturedError = error + } + } + if Output.OutputType.self == Void.self { + capturedOutput = (() as Any) as? Output.OutputType + } + if Error.OutputType.self == Void.self { + capturedError = (() as Any) as? Error.OutputType + } + return (processIdentifier, result, capturedOutput!, capturedError!) } } -} -/// Runs an executable with a given ``Configuration`` and a custom closure -/// to manage the running subprocess's lifetime, write to its -/// standard input, and stream its standard output. -/// - Parameters: -/// - configuration: The configuration to run. -/// - error: How to manage executable standard error. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - inputWriter: A writer for the subprocess's standard input. -/// - outputSequence: The standard output as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ configuration: Configuration, - error: Error = .discarded, - body: ( - _ execution: Execution, - _ inputWriter: StandardInputWriter, - _ outputSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome where Error.OutputType == Void { - let input = CustomWriteInput() - let output = SequenceOutput() - let inputPipe = try input.createPipe() - let outputPipe = try output.createPipe() - let errorPipe = try error.createPipe(from: outputPipe) - - return try await configuration.run( - input: inputPipe, - output: outputPipe, - error: errorPipe - ) { execution, inputIO, outputIO, errorIO in - var outputIOBox = consume outputIO - var errorIOBox = consume errorIO - try errorIOBox?.safelyClose() - - let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence( - diskIO: outputIOBox!.consumeDescriptor() - ) - - let result = try await body(execution, writer, outputSequence) - try await writer.finish() - return result - } -} - -/// Runs an executable with a given ``Configuration`` and a custom closure -/// to manage the running subprocess's lifetime, write to its -/// standard input, and stream its standard error. -/// - Parameters: -/// - configuration: The configuration to run. -/// - output: How to manage executable standard output. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - inputWriter: A writer for the subprocess's standard input. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ configuration: Configuration, - output: Output, - body: ( - _ execution: Execution, - _ inputWriter: StandardInputWriter, - _ errorSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome where Output.OutputType == Void { - let input = CustomWriteInput() - let error = SequenceOutput() - - return try await configuration.run( - input: try input.createPipe(), - output: try output.createPipe(), - error: try error.createPipe(), - ) { execution, inputIO, outputIO, errorIO in - var outputIOBox = consume outputIO - var errorIOBox = consume errorIO - try outputIOBox?.safelyClose() - - let writer = StandardInputWriter(diskIO: inputIO!) - let errorSequence = AsyncBufferSequence( - diskIO: errorIOBox!.consumeDescriptor() - ) - let bodyResult = try await body(execution, writer, errorSequence) - try await writer.finish() - return bodyResult - } + return ExecutionResult( + processIdentifier: result.value.processIdentifier, + terminationStatus: result.terminationStatus, + closureOutput: result.value.closureResult, + standardOutput: result.value.output, + standardError: result.value.error + ) } -/// Runs an executable with a given ``Configuration`` -/// and a custom closure to manage the running subprocess's lifetime, write to its -/// standard input, and stream its standard output and standard error. -/// - Parameters: -/// - configuration: The configuration to run. -/// - body: A closure to manage the running process. -/// All arguments passed to this closure are valid only for -/// the duration of the closure's execution and must not be escaped. -/// - execution: The running subprocess. -/// - inputWriter: A writer for the subprocess's standard input. -/// - outputSequence: The standard output as an asynchronous sequence of buffers. -/// - errorSequence: The standard error as an asynchronous sequence of buffers. -/// - Returns: An ``ExecutionOutcome`` that contains the closure's return value. -public func run( - _ configuration: Configuration, - body: ( - _ execution: Execution, - _ inputWriter: StandardInputWriter, - _ outputSequence: AsyncBufferSequence, - _ errorSequence: AsyncBufferSequence - ) async throws -> Result -) async throws -> ExecutionOutcome { - let input = CustomWriteInput() - let output = SequenceOutput() - let error = SequenceOutput() - - return try await configuration.run( - input: try input.createPipe(), - output: try output.createPipe(), - error: try error.createPipe() - ) { execution, inputIO, outputIO, errorIO in - var outputIOBox = consume outputIO - var errorIOBox = consume errorIO - - let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence( - diskIO: outputIOBox!.consumeDescriptor() - ) - let errorSequence = AsyncBufferSequence( - diskIO: errorIOBox!.consumeDescriptor() - ) - let result = try await body(execution, writer, outputSequence, errorSequence) - try await writer.finish() - return result - } +private enum _RunGroupResult { + case standardOutputCaptured(Output.OutputType) + case standardErrorCaptured(Error.OutputType) + case inputWritten } diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 958dee76..fbdffbae 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -31,6 +31,14 @@ internal import FoundationEssentials #endif #endif +#if canImport(Synchronization) +import Synchronization +#endif + +#if canImport(os) +import os +#endif + /// An asynchronous sequence of buffers that streams output from a subprocess. public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { /// The failure type for the asynchronous sequence. @@ -90,13 +98,18 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { } private let diskIO: DiskIO + private let state: State internal init(diskIO: DiskIO) { self.diskIO = diskIO + self.state = State() } /// Creates an iterator for this asynchronous sequence. public func makeAsyncIterator() -> Iterator { + guard self.state.initializedCount() == 1 else { + fatalError("AsyncBufferSequence is single pass. It can only be iterated once.") + } return Iterator(diskIO: self.diskIO) } @@ -134,7 +147,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { /// - encoding: The Unicode encoding to decode with. /// - Returns: A ``StringSequence`` that iterates through /// the buffer contents as strings. - public func strings( + public func strings( separatedBy separator: StringSequence.Separator = .lineBreaks, bufferingPolicy: StringSequence.BufferingPolicy = .maxLineLength(128 * 1024), as encoding: Encoding.Type, @@ -151,6 +164,35 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { @available(*, unavailable) extension AsyncBufferSequence.Iterator: Sendable {} +extension AsyncBufferSequence { + private final class State { + #if os(macOS) + private let value: OSAllocatedUnfairLock + #else + private let value: Atomic + #endif + + init() { + #if os(macOS) + self.value = OSAllocatedUnfairLock(initialState: 0) + #else + self.value = Atomic(0) + #endif + } + + func initializedCount() -> Int { + #if os(macOS) + return self.value.withLock { state in + state += 1 + return state + } + #else + return self.value.add(1, ordering: .sequentiallyConsistent).newValue + #endif + } + } +} + // MARK: - StringSequence extension AsyncBufferSequence { /// An asynchronous sequence of strings parsed from a buffer @@ -180,7 +222,7 @@ extension AsyncBufferSequence { /// ``Separator/unicodeScalarSequence(_:)``, the sequence performs a /// code-unit-level comparison without Unicode normalization. /// See ``Separator/unicodeScalarSequence(_:)`` for details. - public struct StringSequence: AsyncSequence, Sendable { + public struct StringSequence: AsyncSequence, Sendable { /// The element type for the asynchronous sequence. public typealias Element = String @@ -189,7 +231,7 @@ extension AsyncBufferSequence { private let separator: Separator /// An iterator for ``StringSequence``. - public struct AsyncIterator: AsyncIteratorProtocol { + public struct AsyncIterator: nonisolated AsyncIteratorProtocol { /// The element type for this Iterator. public typealias Element = String diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index a8c6802a..fb6fbbfe 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -82,13 +82,16 @@ public struct Configuration: Sendable { ) } - internal func run( + internal func run( input: consuming CreatedPipe, + as inputType: Input.Type, output: consuming CreatedPipe, + as outputType: Output.Type, error: consuming CreatedPipe, + as errorType: Error.Type, _ body: ( ( - Execution, + ProcessIdentifier, consuming IODescriptor?, consuming IODescriptor?, consuming IODescriptor? @@ -101,10 +104,11 @@ public struct Configuration: Sendable { errorPipe: error ) - let execution = spawnResults.execution + let processIdentifier = spawnResults.processIdentifier + defer { // Close process file descriptor now we finished monitoring - execution.processIdentifier.close() + processIdentifier.close() } return try await withAsyncTaskCleanupHandler { () throws -> ExecutionOutcome in @@ -112,10 +116,10 @@ public struct Configuration: Sendable { let outputIO = spawnResults.outputReadEnd() let errorIO = spawnResults.errorReadEnd() - let result: Swift.Result + let result: Swift.Result do { // Body runs in the same isolation - let bodyResult = try await body(execution, inputIO, outputIO, errorIO) + let bodyResult = try await body(processIdentifier, inputIO, outputIO, errorIO) result = .success(bodyResult) } catch { @@ -128,7 +132,7 @@ public struct Configuration: Sendable { // process table which will cause the process termination monitoring thread // to effectively hang due to the pid never being awaited let terminationStatus = try await monitorProcessTermination( - for: execution.processIdentifier + for: processIdentifier ) return ExecutionOutcome( @@ -136,6 +140,12 @@ public struct Configuration: Sendable { value: try result.get() ) } onCleanup: { + let execution = Execution( + processIdentifier: processIdentifier, + inputWriter: nil, + outputStream: nil, + errorStream: nil + ) // Attempt to terminate the child process await execution.runTeardownSequence( self.platformOptions.teardownSequence @@ -655,18 +665,18 @@ extension Configuration { /// by `spawn()`. It returns the parent side file descriptors /// via `SpawnResult` to perform actual reads internal struct SpawnResult: ~Copyable { - let execution: Execution + let processIdentifier: ProcessIdentifier var _inputWriteEnd: IODescriptor? var _outputReadEnd: IODescriptor? var _errorReadEnd: IODescriptor? init( - execution: Execution, + processIdentifier: ProcessIdentifier, inputWriteEnd: consuming IODescriptor?, outputReadEnd: consuming IODescriptor?, errorReadEnd: consuming IODescriptor? ) { - self.execution = execution + self.processIdentifier = processIdentifier self._inputWriteEnd = consume inputWriteEnd self._outputReadEnd = consume outputReadEnd self._errorReadEnd = consume errorReadEnd @@ -925,7 +935,7 @@ internal enum PipeNameCounter { } #endif -internal struct CreatedPipe: ~Copyable { +internal struct CreatedPipe: ~Copyable, Sendable { internal enum Purpose: CustomStringConvertible { /// This pipe is used for standard input. This option maps to /// `PIPE_ACCESS_OUTBOUND` on Windows where child only reads, diff --git a/Sources/Subprocess/Execution.swift b/Sources/Subprocess/Execution.swift index a1126614..45e7bb1d 100644 --- a/Sources/Subprocess/Execution.swift +++ b/Sources/Subprocess/Execution.swift @@ -29,16 +29,79 @@ import Musl /// A running subprocess. /// -/// Use this type to send signals to the child process -/// and to stream its output and error. -public struct Execution: Sendable { +/// Use this type to send signals to the child process, write to its standard +/// input, and stream its standard output and standard error. +/// +/// The three generic parameters determine which streaming properties are +/// available. The ``standardInputWriter`` property is available when `Input` +/// is ``CustomWriteInput``, the ``standardOutput`` property is available +/// when `Output` is ``SequenceOutput``, and the ``standardError`` property +/// is available when `Error` is ``SequenceOutput``. +/// +/// You receive an `Execution` value from the body closure of a `run` +/// function. The value is valid only for the duration of the closure. +/// Don't let the execution value escape the closure. +public struct Execution: Sendable { /// The process identifier of this subprocess. public let processIdentifier: ProcessIdentifier + private let inputWriter: StandardInputWriter? + private let outputStream: AsyncBufferSequence? + private let errorStream: AsyncBufferSequence? + init( - processIdentifier: ProcessIdentifier + processIdentifier: ProcessIdentifier, + inputWriter: StandardInputWriter?, + outputStream: AsyncBufferSequence?, + errorStream: AsyncBufferSequence? ) { self.processIdentifier = processIdentifier + self.inputWriter = inputWriter + self.outputStream = outputStream + self.errorStream = errorStream + } +} + +extension Execution where Input == CustomWriteInput { + /// A writer that sends data to the subprocess's standard input. + /// + /// Call ``StandardInputWriter/finish()`` after the last write so the + /// subprocess sees end-of-file on its standard input. + public var standardInputWriter: StandardInputWriter { + return self.inputWriter! + } +} + +extension Execution where Output == SequenceOutput { + /// The standard output of the subprocess as an asynchronous sequence of + /// buffers. + public var standardOutput: AsyncBufferSequence { + return self.outputStream! + } +} + +extension Execution where Error == SequenceOutput { + /// The standard error of the subprocess as an asynchronous sequence of + /// buffers. + public var standardError: AsyncBufferSequence { + return self.errorStream! + } +} + +extension Execution { + @available(*, unavailable, message: "this property requires that the input is .standardInput") + public var standardInputWriter: StandardInputWriter { + fatalError() + } + + @available(*, unavailable, message: "this property requires that the output is .sequence") + public var standardOutput: AsyncBufferSequence { + fatalError() + } + + @available(*, unavailable, message: "this property requires that the error is .sequence") + public var standardError: AsyncBufferSequence { + fatalError() } } diff --git a/Sources/Subprocess/IO/Input.swift b/Sources/Subprocess/IO/Input.swift index 9d249940..1686260f 100644 --- a/Sources/Subprocess/IO/Input.swift +++ b/Sources/Subprocess/IO/Input.swift @@ -146,15 +146,21 @@ public struct ArrayInput: InputProtocol { } } -/// A concrete input type that the run closure uses to write custom input -/// into the subprocess. -internal struct CustomWriteInput: InputProtocol { - /// Asynchronously write the input to the subprocess using the - /// write file descriptor. +/// An input type that lets the body closure write to the subprocess's +/// standard input through ``Execution/standardInputWriter``. +/// +/// Use ``InputProtocol/inputWriter`` to create a value of this type when you +/// call a `run` function that takes a body closure. The closure writes the +/// input through ``Execution/standardInputWriter`` and calls +/// ``StandardInputWriter/finish()`` to signal end-of-file. +public struct CustomWriteInput: InputProtocol { + /// Writes the input to the subprocess asynchronously. + /// + /// This method is a no-op. ``CustomWriteInput`` exposes the + /// ``StandardInputWriter`` directly to the body closure, which writes + /// to the subprocess. public func write(with writer: StandardInputWriter) async throws { - // Intentional no-op - // CustomWriteInput exposes the StandardInputWriter directly - // to the caller's closure, so writing is handled there instead. + // Intentional no-op. } internal init() {} @@ -220,6 +226,16 @@ extension InputProtocol { } } +extension InputProtocol where Self == CustomWriteInput { + /// Creates a subprocess input that the body closure writes to through + /// ``Execution/standardInputWriter``. + /// + /// Use this input with a `run` overload that takes a body closure. + public static var inputWriter: Self { + return CustomWriteInput() + } +} + extension InputProtocol { internal func createPipe() throws(SubprocessError) -> CreatedPipe { if let noInput = self as? NoInput { diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index 602bc3bd..6b1b0fec 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -173,11 +173,14 @@ public struct BytesOutput: OutputProtocol, ErrorOutputProtocol { } } -/// A concrete `Output` type for subprocesses that redirects the child output to -/// the `.currentStandardOutput` (a sequence) or `.currentStandardError` property of -/// `Execution`. This output type is only applicable to the `run()` family that -/// takes a custom closure. -internal struct SequenceOutput: OutputProtocol { +/// An output type that streams the subprocess's output through the body +/// closure as an asynchronous sequence of buffers. +/// +/// Use ``OutputProtocol/sequence`` to create a value of this type when you +/// call a `run` function that takes a body closure. The closure reads the +/// output by iterating ``Execution/standardOutput`` or +/// ``Execution/standardError``. +public struct SequenceOutput: OutputProtocol, ErrorOutputProtocol { /// The output type for this output option. public typealias OutputType = Void @@ -257,6 +260,16 @@ extension OutputProtocol where Self == BytesOutput { } } +extension OutputProtocol where Self == SequenceOutput { + /// Creates a subprocess output that the body closure reads from + /// ``Execution/standardOutput`` or ``Execution/standardError``. + /// + /// Use this output with a `run` overload that takes a body closure. + public static var sequence: Self { + return SequenceOutput() + } +} + // MARK: - ErrorOutputProtocol /// A type that serves as the standard error output target for a subprocess. @@ -276,7 +289,10 @@ public protocol ErrorOutputProtocol: OutputProtocol {} /// streams together, making it possible to process all subprocess output as a unified /// stream rather than handling standard output and standard error separately. public struct CombinedErrorOutput: ErrorOutputProtocol { + /// The output type for this output option. public typealias OutputType = Void + + internal init() {} } extension ErrorOutputProtocol { diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 0f52ae84..b827876e 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -473,11 +473,8 @@ extension Configuration { errorWrite: errorWriteFileDescriptor ) - let execution = Execution( - processIdentifier: .init(value: pid) - ) return SpawnResult( - execution: execution, + processIdentifier: .init(value: pid), inputWriteEnd: inputWriteFileDescriptor, outputReadEnd: outputReadFileDescriptor, errorReadEnd: errorReadFileDescriptor diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 5228e882..8881bbe6 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -551,14 +551,12 @@ extension Configuration { errorRead: nil, errorWrite: errorWriteFileDescriptor ) - let execution = Execution( - processIdentifier: .init( - value: pid, - processDescriptor: processDescriptor - ) + let processIdentifier: ProcessIdentifier = .init( + value: pid, + processDescriptor: processDescriptor ) return SpawnResult( - execution: execution, + processIdentifier: processIdentifier, inputWriteEnd: inputWriteFileDescriptor, outputReadEnd: outputReadFileDescriptor, errorReadEnd: errorReadFileDescriptor diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 02fc77df..85575c71 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -197,9 +197,6 @@ extension Configuration { processDescriptor: processInfo.hProcess, threadHandle: processInfo.hThread ) - let execution = Execution( - processIdentifier: pid - ) do { // After spawn finishes, close all child side fds @@ -219,7 +216,7 @@ extension Configuration { } return SpawnResult( - execution: execution, + processIdentifier: pid, inputWriteEnd: inputWriteFileDescriptor, outputReadEnd: outputReadFileDescriptor, errorReadEnd: errorReadFileDescriptor @@ -407,9 +404,6 @@ extension Configuration { processDescriptor: processInfo.hProcess, threadHandle: processInfo.hThread ) - let execution = Execution( - processIdentifier: pid - ) do { // After spawn finishes, close all child side fds @@ -429,7 +423,7 @@ extension Configuration { } return SpawnResult( - execution: execution, + processIdentifier: pid, inputWriteEnd: inputWriteFileDescriptor, outputReadEnd: outputReadFileDescriptor, errorReadEnd: errorReadFileDescriptor diff --git a/Sources/Subprocess/Result.swift b/Sources/Subprocess/Result.swift index 63a7f3cc..77be17a7 100644 --- a/Sources/Subprocess/Result.swift +++ b/Sources/Subprocess/Result.swift @@ -17,21 +17,19 @@ import SystemPackage // MARK: - Result -/// The outcome of a subprocess execution, containing the closure's return value and the termination status of the child process. -public struct ExecutionOutcome: Sendable { - /// The termination status of the child process. - public let terminationStatus: TerminationStatus - /// The value returned by the closure passed to the `run` method. - public let value: Result - - internal init(terminationStatus: TerminationStatus, value: Result) { - self.terminationStatus = terminationStatus - self.value = value - } -} - -/// The result of running a subprocess, including collected standard output and standard error. -public struct ExecutionRecord< +/// The result of running a subprocess, including the closure's return value, +/// collected standard output, and collected standard error. +/// +/// The `ClosureResult` generic parameter is `Void` when you call a `run(...)` +/// overload that doesn't take a `body` closure. It's the closure's return type +/// otherwise. You access the closure's return value with ``closureOutput``. +/// +/// The ``standardOutput`` and ``standardError`` properties are available when +/// the corresponding output type produces a non-`Void` value. They're +/// unavailable for output types such as ``DiscardedOutput``, ``SequenceOutput``, +/// and ``FileDescriptorOutput``. +public struct ExecutionResult< + ClosureResult: Sendable, Output: OutputProtocol, Error: OutputProtocol >: Sendable { @@ -39,38 +37,44 @@ public struct ExecutionRecord< public let processIdentifier: ProcessIdentifier /// The termination status of the subprocess. public let terminationStatus: TerminationStatus + /// The collected standard output of the subprocess. public let standardOutput: Output.OutputType /// The collected standard error of the subprocess. public let standardError: Error.OutputType + /// The value returned by the body closure passed to `run`. + public let closureOutput: ClosureResult + internal init( processIdentifier: ProcessIdentifier, terminationStatus: TerminationStatus, + closureOutput: ClosureResult, standardOutput: Output.OutputType, standardError: Error.OutputType ) { self.processIdentifier = processIdentifier self.terminationStatus = terminationStatus + self.closureOutput = closureOutput self.standardOutput = standardOutput self.standardError = standardError } } -// MARK: - ExecutionRecord Conformances +// MARK: - ExecutionResult Conformances -extension ExecutionRecord: Equatable where Output.OutputType: Equatable, Error.OutputType: Equatable {} +extension ExecutionResult: Equatable where Output.OutputType: Equatable, Error.OutputType: Equatable, ClosureResult: Equatable {} -extension ExecutionRecord: Hashable where Output.OutputType: Hashable, Error.OutputType: Hashable {} +extension ExecutionResult: Hashable where Output.OutputType: Hashable, Error.OutputType: Hashable, ClosureResult: Hashable {} -extension ExecutionRecord: CustomStringConvertible -where Output.OutputType: CustomStringConvertible, Error.OutputType: CustomStringConvertible { +extension ExecutionResult: CustomStringConvertible where Output.OutputType: CustomStringConvertible, Error.OutputType: CustomStringConvertible { /// A textual representation of the collected result. public var description: String { return """ - ExecutionRecord( + ExecutionResult( processIdentifier: \(self.processIdentifier), terminationStatus: \(self.terminationStatus.description), + closureOutput: \(String(describing: self.closureOutput)), standardOutput: \(self.standardOutput.description) standardError: \(self.standardError.description) ) @@ -78,14 +82,15 @@ where Output.OutputType: CustomStringConvertible, Error.OutputType: CustomString } } -extension ExecutionRecord: CustomDebugStringConvertible +extension ExecutionResult: CustomDebugStringConvertible where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomDebugStringConvertible { /// A debug-oriented textual representation of the collected result. public var debugDescription: String { return """ - ExecutionRecord( + ExecutionResult( processIdentifier: \(self.processIdentifier), - terminationStatus: \(self.terminationStatus.description), + terminationStatus: \(self.terminationStatus.debugDescription), + closureOutput: \(String(describing: self.closureOutput)), standardOutput: \(self.standardOutput.debugDescription) standardError: \(self.standardError.debugDescription) ) @@ -93,14 +98,29 @@ where Output.OutputType: CustomDebugStringConvertible, Error.OutputType: CustomD } } -// MARK: - ExecutionOutcome Conformances +// MARK: - ExecutionOutcome + +/// The outcome of a subprocess execution, containing the closure's return +/// value and the termination status of the child process. +internal struct ExecutionOutcome: Sendable { + /// The termination status of the child process. + internal let terminationStatus: TerminationStatus + /// The value returned by the closure passed to the `run` method. + internal let value: Result + + internal init(terminationStatus: TerminationStatus, value: Result) { + self.terminationStatus = terminationStatus + self.value = value + } +} + extension ExecutionOutcome: Equatable where Result: Equatable {} extension ExecutionOutcome: Hashable where Result: Hashable {} extension ExecutionOutcome: CustomStringConvertible where Result: CustomStringConvertible { /// A textual representation of the execution result. - public var description: String { + var description: String { return """ ExecutionOutcome( terminationStatus: \(self.terminationStatus.description), @@ -112,7 +132,7 @@ extension ExecutionOutcome: CustomStringConvertible where Result: CustomStringCo extension ExecutionOutcome: CustomDebugStringConvertible where Result: CustomDebugStringConvertible { /// A debug-oriented textual representation of this execution result. - public var debugDescription: String { + var debugDescription: String { return """ ExecutionOutcome( terminationStatus: \(self.terminationStatus.debugDescription), @@ -121,18 +141,3 @@ extension ExecutionOutcome: CustomDebugStringConvertible where Result: CustomDeb """ } } - -// MARK: - Deprecated -@available( - *, deprecated, - renamed: "ExecutionOutcome", - message: "ExecutionResult has been renamed to ExecutionOutcome. ExecutionResult will be removed in 1.0" -) -public typealias ExecutionResult = ExecutionOutcome - -@available( - *, deprecated, - renamed: "ExecutionRecord", - message: "CollectedResult has been renamed to ExecutionRecord. CollectedResult will be removed in 1.0" -) -public typealias CollectedResult = ExecutionRecord diff --git a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift index b6dae6e7..86f76e09 100644 --- a/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift +++ b/Sources/Subprocess/SubprocessFoundation/Input+Foundation.swift @@ -75,6 +75,7 @@ public struct DataAsyncSequenceInput< /// Asynchronously write the input to the subprocess using the /// write file descriptor. + @concurrent public func write(with writer: StandardInputWriter) async throws(SubprocessError) { do { for try await chunk in self.sequence { @@ -111,7 +112,7 @@ extension InputProtocol { /// Creates a subprocess input from an asynchronous sequence of `Data` values. public static func sequence( _ asyncSequence: InputSequence - ) -> Self where Self == DataAsyncSequenceInput { + ) -> Self where Self == DataAsyncSequenceInput, InputSequence.Element == Data { return .init(underlying: asyncSequence) } } @@ -129,6 +130,7 @@ extension StandardInputWriter { /// Writes an asynchronous sequence of `Data` to the subprocess's standard input. /// - Parameter asyncSequence: The sequence of data to write. /// - Returns: The number of bytes written. + @concurrent public func write( _ asyncSequence: AsyncSendableSequence ) async throws(SubprocessError) -> Int where AsyncSendableSequence.Element == Data { diff --git a/Tests/SubprocessTests/DarwinTests.swift b/Tests/SubprocessTests/DarwinTests.swift index 93d074bc..b59a8f3d 100644 --- a/Tests/SubprocessTests/DarwinTests.swift +++ b/Tests/SubprocessTests/DarwinTests.swift @@ -80,8 +80,10 @@ struct SubprocessDarwinTests { _ = try await Subprocess.run( // This will intentionally hang .path("/bin/cat"), + input: .none, + output: .discarded, error: .discarded - ) { subprocess, standardOutput in + ) { subprocess in // First suspend the process try subprocess.send(signal: .suspend) var suspendedStatus: Int32 = 0 @@ -95,7 +97,6 @@ struct SubprocessDarwinTests { // Now kill the process try subprocess.send(signal: .terminate) - for try await _ in standardOutput {} } } } diff --git a/Tests/SubprocessTests/IntegrationTests.swift b/Tests/SubprocessTests/IntegrationTests.swift index 810b7b05..d87ff227 100644 --- a/Tests/SubprocessTests/IntegrationTests.swift +++ b/Tests/SubprocessTests/IntegrationTests.swift @@ -994,24 +994,26 @@ extension SubprocessIntegrationTests { ) let result = try await _run( setup, + input: .inputWriter, + output: .sequence, error: .discarded - ) { execution, standardInputWriter, standardOutput in + ) { execution in async let buffer = { var _buffer = Data() - for try await chunk in standardOutput { + for try await chunk in execution.standardOutput { let currentChunk = chunk.withUnsafeBytes { Data($0) } _buffer += currentChunk } return _buffer }() - _ = try await standardInputWriter.write(Array(expected)) - try await standardInputWriter.finish() + _ = try await execution.standardInputWriter.write(Array(expected)) + try await execution.standardInputWriter.finish() return try await buffer } #expect(result.terminationStatus.isSuccess) - #expect(result.value == expected) + #expect(result.closureOutput == expected) } @Test func testNoInputTriggersEOF() async throws { @@ -1690,24 +1692,26 @@ extension SubprocessIntegrationTests { ) let result = try await _run( setup, - output: .discarded - ) { execution, standardInputWriter, standardError in + input: .inputWriter, + output: .discarded, + error: .sequence + ) { execution in async let buffer = { var _buffer = Data() - for try await chunk in standardError { + for try await chunk in execution.standardError { let currentChunk = chunk.withUnsafeBytes { Data($0) } _buffer += currentChunk } return _buffer }() - _ = try await standardInputWriter.write(Array(expected)) - try await standardInputWriter.finish() + _ = try await execution.standardInputWriter.write(Array(expected)) + try await execution.standardInputWriter.finish() return try await buffer } #expect(result.terminationStatus.isSuccess) - #expect(result.value == expected) + #expect(result.closureOutput == expected) } @Test func stressTestWithLittleOutput() async throws { @@ -1863,9 +1867,10 @@ extension SubprocessIntegrationTests { _ = try await _run( setup, input: .none, + output: .sequence, error: .discarded, - ) { execution, standardOutput in - for try await line in standardOutput.strings() { + ) { execution in + for try await line in execution.standardOutput.strings() { // If we use default buffer size this test will hang // because Subprocess is stuck on waiting 16k worth of // output when there are only 3. @@ -2031,10 +2036,11 @@ extension SubprocessIntegrationTests { _ = try await _run( setup, input: .none, + output: .sequence, error: .combinedWithOutput - ) { execution, standardOutput in + ) { execution in var output: String = "" - for try await line in standardOutput.strings() { + for try await line in execution.standardOutput.strings() { output += line } #expect(output.contains("Hello Stdout")) @@ -2057,12 +2063,14 @@ extension SubprocessIntegrationTests { _ = try await _run( setup, - input: .none - ) { execution, standardOutput, standardError in + input: .none, + output: .sequence, + error: .sequence + ) { execution in try await withThrowingTaskGroup { group in group.addTask { var stdout: String = "" - for try await line in standardOutput.strings() { + for try await line in execution.standardOutput.strings() { stdout += line } #expect(stdout.contains("Hello Stdout")) @@ -2070,7 +2078,7 @@ extension SubprocessIntegrationTests { group.addTask { var stderr: String = "" - for try await line in standardError.strings() { + for try await line in execution.standardError.strings() { stderr += line } #expect(stderr.contains("Hello Stderr")) @@ -2082,6 +2090,203 @@ extension SubprocessIntegrationTests { } } +// MARK: - Unified run() Tests +extension SubprocessIntegrationTests { + @Test func testBodyClosureReturnValue() async throws { + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "echo hello"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/echo"), + arguments: ["hello"] + ) + #endif + let result = try await _run( + setup, + input: .none, + output: .discarded, + error: .discarded + ) { _ in + return 42 + } + #expect(result.terminationStatus.isSuccess) + #expect(result.closureOutput == 42) + } + + @Test func testBodyClosureReturnValueWithCapturedOutput() async throws { + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "echo hello"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/echo"), + arguments: ["hello"] + ) + #endif + let result = try await _run( + setup, + input: .none, + output: .string(limit: 64), + error: .discarded + ) { _ in + return "closure-value" + } + #expect(result.terminationStatus.isSuccess) + #expect(result.closureOutput == "closure-value") + let output = try #require(result.standardOutput) + #expect(output.contains("hello")) + } + + @Test func testBodyClosureThrows() async throws { + struct BodyError: Error, Equatable {} + + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "echo hello"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/echo"), + arguments: ["hello"] + ) + #endif + await #expect(throws: BodyError.self) { + let _: ExecutionResult = try await _run( + setup, + input: .none, + output: .discarded, + error: .discarded + ) { _ in + throw BodyError() + } + } + } + + @Test func testBodyClosureThrowsWithInputWriter() async throws { + // Regression test for the CustomWriteInput hang fix: when the body + // throws, run() must still finish() the input writer so the child + // process sees EOF on stdin and terminates. Otherwise this test hangs. + struct BodyError: Error, Equatable {} + + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "findstr x*"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/cat"), + arguments: [] + ) + #endif + await #expect(throws: BodyError.self) { + let _: ExecutionResult = try await _run( + setup, + input: .inputWriter, + output: .discarded, + error: .discarded + ) { _ in + throw BodyError() + } + } + } + + @Test func testStreamingOutputWithCapturedError() async throws { + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "echo Hello Stdout & echo Hello Stderr 1>&2"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/sh"), + arguments: ["-c", "echo Hello Stdout; echo Hello Stderr 1>&2"] + ) + #endif + let result = try await _run( + setup, + input: .none, + output: .sequence, + error: .string(limit: 64) + ) { execution in + var collected = "" + for try await line in execution.standardOutput.strings() { + collected += line + } + return collected + } + #expect(result.terminationStatus.isSuccess) + #expect(result.closureOutput.contains("Hello Stdout")) + let stderr = try #require(result.standardError) + #expect(stderr.contains("Hello Stderr")) + } + + @Test func testStreamingErrorWithCapturedOutput() async throws { + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "echo Hello Stdout & echo Hello Stderr 1>&2"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/sh"), + arguments: ["-c", "echo Hello Stdout; echo Hello Stderr 1>&2"] + ) + #endif + let result = try await _run( + setup, + input: .none, + output: .string(limit: 64), + error: .sequence + ) { execution in + var collected = "" + for try await line in execution.standardError.strings() { + collected += line + } + return collected + } + #expect(result.terminationStatus.isSuccess) + #expect(result.closureOutput.contains("Hello Stderr")) + let stdout = try #require(result.standardOutput) + #expect(stdout.contains("Hello Stdout")) + } + + @Test func testInputWriterWithoutExplicitFinish() async throws { + // Verifies that run() finishes the input writer after the body returns + // when the caller forgot to. Without this, the child never sees EOF on + // stdin and this test hangs. + #if os(Windows) + let setup = TestSetup( + executable: .name("cmd.exe"), + arguments: ["/c", "findstr x*"] + ) + #else + let setup = TestSetup( + executable: .path("/bin/cat"), + arguments: [] + ) + #endif + let result = try await _run( + setup, + input: .inputWriter, + output: .string(limit: 64), + error: .discarded + ) { execution in + _ = try await execution.standardInputWriter.write("hello\n") + // Intentionally don't call finish(); run() should do it. + } + #expect(result.terminationStatus.isSuccess) + let output = try #require(result.standardOutput) + #expect(output.contains("hello")) + } +} + // MARK: - Other Tests extension SubprocessIntegrationTests { @Test func testTerminateProcess() async throws { @@ -2100,15 +2305,15 @@ extension SubprocessIntegrationTests { // This will intentionally hang setup, input: .none, + output: .discarded, error: .discarded - ) { subprocess, standardOutput in + ) { subprocess in // Make sure we can send signals to terminate the process #if os(Windows) try subprocess.terminate(withExitCode: 99) #else try subprocess.send(signal: .terminate) #endif - for try await _ in standardOutput {} } #if os(Windows) @@ -2242,10 +2447,11 @@ extension SubprocessIntegrationTests { _ = try await _run( setup, input: .none, + output: .sequence, error: .discarded - ) { execution, standardOutput in + ) { execution in var index = 0 - for try await line in standardOutput.strings(as: UTF8.self) { + for try await line in execution.standardOutput.strings(as: UTF8.self) { defer { index += 1 } try #require(index < testCases.count, "Received more lines than expected") #expect( @@ -2391,28 +2597,28 @@ extension SubprocessIntegrationTests { ) #endif - let result = try await _run(setup) { execution, standardInputWriter, standardOutput, standardError in + let result = try await _run(setup, input: .inputWriter, output: .sequence, error: .sequence) { execution in return try await withThrowingTaskGroup(of: OutputCaptureState?.self) { group in group.addTask { #if os(Windows) - _ = try await standardInputWriter.write("echo off\n") + _ = try await execution.standardInputWriter.write("echo off\n") #endif - _ = try await standardInputWriter.write("echo hello stdout\n") - _ = try await standardInputWriter.write("echo >&2 hello stderr\n") - _ = try await standardInputWriter.write("exit 0\n") - try await standardInputWriter.finish() + _ = try await execution.standardInputWriter.write("echo hello stdout\n") + _ = try await execution.standardInputWriter.write("echo >&2 hello stderr\n") + _ = try await execution.standardInputWriter.write("exit 0\n") + try await execution.standardInputWriter.finish() return nil } group.addTask { var result = "" - for try await line in standardOutput.strings() { + for try await line in execution.standardOutput.strings() { result += line } return .standardOutputCaptured(result.trimmingNewLineAndQuotes()) } group.addTask { var result = "" - for try await line in standardError.strings() { + for try await line in execution.standardError.strings() { result += line } return .standardErrorCaptured(result.trimmingNewLineAndQuotes()) @@ -2438,11 +2644,11 @@ extension SubprocessIntegrationTests { #expect(result.terminationStatus.isSuccess) #if os(Windows) // cmd.exe interactive mode prints more info - #expect(result.value.output.contains("hello stdout")) + #expect(result.closureOutput.output.contains("hello stdout")) #else - #expect(result.value.output == "hello stdout") + #expect(result.closureOutput.output == "hello stdout") #endif - #expect(result.value.error == "hello stderr") + #expect(result.closureOutput.error == "hello stderr") } @Test func testSubprocessPipeChain() async throws { @@ -2653,9 +2859,9 @@ extension SubprocessIntegrationTests { ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .none, output: .sequence, error: .discarded) { execution in var output: [String] = [] - for try await line in standardOutput.strings(separatedBy: .lineBreaks) { + for try await line in execution.standardOutput.strings(separatedBy: .lineBreaks) { output.append(line) } @@ -2676,15 +2882,15 @@ extension SubprocessIntegrationTests { arguments: ["-c", "/bin/echo -n x; /bin/echo >&2 -n y"] ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .inputWriter, output: .sequence, error: .sequence) { execution in try await withThrowingTaskGroup { group in group.addTask { - try await inputWriter.finish() + try await execution.standardInputWriter.finish() } group.addTask { var result = "" - for try await line in standardOutput.strings() { + for try await line in execution.standardOutput.strings() { result += line } #expect(result.trimmingCharacters(in: .whitespaces) == "x") @@ -2692,7 +2898,7 @@ extension SubprocessIntegrationTests { group.addTask { var result = "" - for try await line in standardError.strings() { + for try await line in execution.standardError.strings() { result += line } #expect(result.trimmingCharacters(in: .whitespaces) == "y") @@ -2725,9 +2931,9 @@ extension SubprocessIntegrationTests { ] ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .none, output: .sequence, error: .discarded) { execution in var output: [String] = [] - for try await line in standardOutput.strings(separatedBy: .unicodeScalarSequence(["\0"])) { + for try await line in execution.standardOutput.strings(separatedBy: .unicodeScalarSequence(["\0"])) { output.append(line) } @@ -2754,9 +2960,9 @@ extension SubprocessIntegrationTests { ] ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .none, output: .sequence, error: .discarded) { execution in var output: [String] = [] - for try await line in standardOutput.strings(separatedBy: .unicodeScalarSequence(["?", ":"])) { + for try await line in execution.standardOutput.strings(separatedBy: .unicodeScalarSequence(["?", ":"])) { output.append(line) } @@ -2786,9 +2992,9 @@ extension SubprocessIntegrationTests { ] ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .none, output: .sequence, error: .discarded) { execution in var output: [String] = [] - for try await line in standardOutput.strings(separatedBy: .unicodeScalarSequence(["\u{2022}"])) { + for try await line in execution.standardOutput.strings(separatedBy: .unicodeScalarSequence(["\u{2022}"])) { output.append(line) } @@ -2819,9 +3025,9 @@ extension SubprocessIntegrationTests { ] ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .none, output: .sequence, error: .discarded) { execution in var output: [String] = [] - for try await line in standardOutput.strings(separatedBy: .unicodeScalarSequence(["|"])) { + for try await line in execution.standardOutput.strings(separatedBy: .unicodeScalarSequence(["|"])) { output.append(line) } @@ -2848,9 +3054,11 @@ extension SubprocessIntegrationTests { ] ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .none, output: .sequence, error: .discarded) { execution in var output: [String] = [] - for try await line in standardOutput.strings(separatedBy: .unicodeScalarSequence(["|"])) { + for try await line in execution.standardOutput.strings( + separatedBy: .unicodeScalarSequence(["|"]) + ) { output.append(line) } @@ -2897,9 +3105,9 @@ extension SubprocessIntegrationTests { ) #endif - _ = try await _run(setup) { execution, inputWriter, standardOutput, standardError in + _ = try await _run(setup, input: .none, output: .sequence, error: .sequence) { execution in var output: [String] = [] - for try await line in standardOutput.strings() { + for try await line in execution.standardOutput.strings() { output.append(line) } #expect(output == ["100°C", "32—x", "€5"]) @@ -2946,7 +3154,7 @@ func _run< input: Input, output: Output, error: Error -) async throws -> ExecutionRecord { +) async throws -> ExecutionResult { return try await Subprocess.run( testSetup.executable, arguments: testSetup.arguments, @@ -2968,7 +3176,7 @@ func _run< input: borrowing Span, output: Output, error: Error -) async throws -> ExecutionRecord { +) async throws -> ExecutionResult { return try await Subprocess.run( testSetup.executable, arguments: testSetup.arguments, @@ -2980,92 +3188,21 @@ func _run< ) } -func _run< - Result, - Input: InputProtocol, - Error: ErrorOutputProtocol ->( +func _run( _ setup: TestSetup, input: Input, + output: Output, error: Error, - body: ((Execution, AsyncBufferSequence) async throws -> Result) -) async throws -> ExecutionOutcome where Error.OutputType == Void { - return try await Subprocess.run( - setup.executable, - arguments: setup.arguments, - environment: setup.environment, - workingDirectory: setup.workingDirectory, - input: input, - error: error, - body: body - ) -} - -func _run< - Result, - Input: InputProtocol ->( - _ setup: TestSetup, - input: Input, - preferredBufferSize: Int? = nil, - body: ((Execution, AsyncBufferSequence, AsyncBufferSequence) async throws -> Result) -) async throws -> ExecutionOutcome { + body: ((Execution) async throws -> Result) +) async throws -> ExecutionResult { return try await Subprocess.run( setup.executable, arguments: setup.arguments, environment: setup.environment, workingDirectory: setup.workingDirectory, input: input, - preferredBufferSize: preferredBufferSize, - body: body - ) -} - -func _run< - Result, - Error: ErrorOutputProtocol ->( - _ setup: TestSetup, - error: Error, - body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) -) async throws -> ExecutionOutcome where Error.OutputType == Void { - return try await Subprocess.run( - setup.executable, - arguments: setup.arguments, - environment: setup.environment, - workingDirectory: setup.workingDirectory, - error: error, - body: body - ) -} - -func _run< - Result, - Output: OutputProtocol ->( - _ setup: TestSetup, - output: Output, - body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) -) async throws -> ExecutionOutcome where Output.OutputType == Void { - return try await Subprocess.run( - setup.executable, - arguments: setup.arguments, - environment: setup.environment, - workingDirectory: setup.workingDirectory, output: output, - body: body - ) -} - -func _run( - _ setup: TestSetup, - body: ((Execution, StandardInputWriter, AsyncBufferSequence, AsyncBufferSequence) async throws -> Result) -) async throws -> ExecutionOutcome { - return try await Subprocess.run( - setup.executable, - arguments: setup.arguments, - environment: setup.environment, - workingDirectory: setup.workingDirectory, + error: error, body: body ) } diff --git a/Tests/SubprocessTests/LinuxTests.swift b/Tests/SubprocessTests/LinuxTests.swift index 9a6557e2..12c3b0da 100644 --- a/Tests/SubprocessTests/LinuxTests.swift +++ b/Tests/SubprocessTests/LinuxTests.swift @@ -68,8 +68,10 @@ struct SubprocessLinuxTests { // This will intentionally hang .path("/usr/bin/sleep"), arguments: ["infinity"], + input: .none, + output: .discarded, error: .discarded - ) { subprocess, standardOutput in + ) { subprocess in // First suspend the process try subprocess.send(signal: .suspend) var thread1: pthread_t? = nil @@ -93,7 +95,6 @@ struct SubprocessLinuxTests { // Now kill the process try subprocess.send(signal: .terminate) - for try await _ in standardOutput {} if let thread1 { pthread_join(thread1, nil) @@ -107,6 +108,7 @@ struct SubprocessLinuxTests { @Test func testUniqueProcessIdentifier() async throws { _ = try await Subprocess.run( .path("/bin/echo"), + input: .none, output: .discarded, error: .discarded ) { subprocess in diff --git a/Tests/SubprocessTests/ProcessMonitoringTests.swift b/Tests/SubprocessTests/ProcessMonitoringTests.swift index 5a1f4745..74ee78db 100644 --- a/Tests/SubprocessTests/ProcessMonitoringTests.swift +++ b/Tests/SubprocessTests/ProcessMonitoringTests.swift @@ -102,7 +102,7 @@ struct SubprocessProcessMonitoringTests { private func withSpawnedExecution( config: Configuration, - _ body: (Execution) async throws -> Void + _ body: (Execution) async throws -> Void ) async throws { let spawnResult = try await config.spawn( withInput: self.devNullInputPipe(), @@ -110,9 +110,15 @@ struct SubprocessProcessMonitoringTests { errorPipe: self.devNullOutputPipe() ) defer { - spawnResult.execution.processIdentifier.close() + spawnResult.processIdentifier.close() } - try await body(spawnResult.execution) + let execution = Execution( + processIdentifier: spawnResult.processIdentifier, + inputWriter: nil, + outputStream: nil, + errorStream: nil + ) + try await body(execution) } } @@ -307,7 +313,7 @@ extension SubprocessProcessMonitoringTests { outputPipe: self.devNullOutputPipe(), errorPipe: self.devNullOutputPipe() ) - spawnedProcesses.append(spawnResult.execution.processIdentifier) + spawnedProcesses.append(spawnResult.processIdentifier) } try await withThrowingTaskGroup { group in diff --git a/Tests/SubprocessTests/UnixTests.swift b/Tests/SubprocessTests/UnixTests.swift index 60b8a0b3..659ba269 100644 --- a/Tests/SubprocessTests/UnixTests.swift +++ b/Tests/SubprocessTests/UnixTests.swift @@ -183,8 +183,9 @@ extension SubprocessUnixTests { """, ], input: .none, + output: .sequence, error: .discarded - ) { subprocess, standardOutput in + ) { subprocess in return try await withThrowingTaskGroup(of: Void.self) { group in group.addTask { try await Task.sleep(for: .milliseconds(200)) @@ -197,7 +198,7 @@ extension SubprocessUnixTests { } group.addTask { var outputs: [String] = [] - for try await line in standardOutput.strings() { + for try await line in subprocess.standardOutput.strings() { outputs.append(line.trimmingCharacters(in: .newlines)) } #expect(outputs == ["saw SIGQUIT", "saw SIGTERM", "saw SIGINT"]) @@ -332,12 +333,14 @@ extension SubprocessUnixTests { """, ], platformOptions: platformOptions, + input: .none, + output: .sequence, error: .fileDescriptor(.standardError, closeAfterSpawningProcess: false) - ) { execution, standardOutput in + ) { execution in // Read stdout incrementally. Once we see the PID line, // we know the trap is set up and it's safe to send SIGINT. var grandChildPid: pid_t? - for try await line in standardOutput.strings() { + for try await line in execution.standardOutput.strings() { let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines) if let pid = pid_t(trimmed) { grandChildPid = pid @@ -350,7 +353,7 @@ extension SubprocessUnixTests { // Make sure the grand child `/usr/bin/yes` actually exited // This is unfortunately racy because the pid isn't immediately invalided // once `kill` returns. Allow a few failures and delay to counter this - let grandChildPid = try #require(result.value) + let grandChildPid = try #require(result.closureOutput) for _ in 0..<10 { let rc = kill(grandChildPid, 0) if rc == 0 { @@ -417,10 +420,12 @@ extension SubprocessUnixTests { """, ], platformOptions: platformOptions, + input: .none, + output: .sequence, error: .fileDescriptor(.standardError, closeAfterSpawningProcess: false) - ) { _, standardOutput in + ) { execution in var grandChildPid: pid_t? - for try await line in standardOutput.strings() { + for try await line in execution.standardOutput.strings() { let trimmed = line.trimmingCharacters(in: .whitespacesAndNewlines) if let pid = pid_t(trimmed) { grandChildPid = pid @@ -430,7 +435,7 @@ extension SubprocessUnixTests { return grandChildPid } #expect(result.terminationStatus == .signaled(SIGTERM)) - let grandChildPid = try #require(result.value) + let grandChildPid = try #require(result.closureOutput) // Grandchild should have been signalled via the process group. // Allow a few iterations for signal propagation and reaping. for _ in 0..<10 { @@ -591,7 +596,8 @@ extension SubprocessUnixTests { } internal func assertNewSessionCreated( - with result: ExecutionRecord< + with result: ExecutionResult< + Void, StringOutput, Output > diff --git a/Tests/SubprocessTests/WindowsTests.swift b/Tests/SubprocessTests/WindowsTests.swift index 3087b05f..7abe744b 100644 --- a/Tests/SubprocessTests/WindowsTests.swift +++ b/Tests/SubprocessTests/WindowsTests.swift @@ -214,11 +214,12 @@ extension SubprocessWindowsTests { self.cmdExe, // This command will intentionally hang arguments: ["/c", "type con"], + input: .none, + output: .discarded, error: .discarded - ) { subprocess, standardOutput in + ) { subprocess in // Make sure we can kill the hung process try subprocess.terminate(withExitCode: 42) - for try await _ in standardOutput {} } // If we got here, the process was terminated guard case .exited(let exitCode) = stuckProcess.terminationStatus else { @@ -233,8 +234,10 @@ extension SubprocessWindowsTests { self.cmdExe, // This command will intentionally hang arguments: ["/c", "type con"], + input: .none, + output: .discarded, error: .discarded - ) { subprocess, standardOutput in + ) { subprocess in try subprocess.suspend() // Now check the to make sure the process is actually suspended // Why not spawn another process to do that? @@ -272,7 +275,6 @@ extension SubprocessWindowsTests { // Now finally kill the process since it's intentionally hung try subprocess.terminate(withExitCode: 0) - for try await _ in standardOutput {} } #expect(stuckProcess.terminationStatus.isSuccess) } @@ -295,8 +297,10 @@ extension SubprocessWindowsTests { self.cmdExe, arguments: ["/c", "echo"], platformOptions: platformOptions, - output: .discarded - ) { execution, _ in + input: .none, + output: .discarded, + error: .discarded + ) { execution in guard AssignProcessToJobObject(hJob, execution.processIdentifier.processDescriptor) else { throw SubprocessError.WindowsError(rawValue: GetLastError()) }