diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index df52ef93..461fcb59 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -166,6 +166,8 @@ public func run< /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process. /// - Returns: an `ExecutableResult` type containing the return value of the closure. @@ -178,6 +180,7 @@ public func run( input: Input = .none, error: Error = .discarded, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Error.OutputType == Void { @@ -193,6 +196,7 @@ public func run( input: input, error: error, preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior, isolation: isolation, body: body ) @@ -213,6 +217,8 @@ public func run( /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: an `ExecutableResult` type containing the return value of the closure. @@ -225,6 +231,7 @@ public func run( input: Input = .none, output: Output, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Output.OutputType == Void { @@ -240,6 +247,7 @@ public func run( input: input, output: output, preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior, isolation: isolation, body: body ) @@ -259,6 +267,8 @@ public func run( /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: An `ExecutableResult` type containing the return value of the closure. @@ -270,6 +280,7 @@ public func run( platformOptions: PlatformOptions = PlatformOptions(), error: Error = .discarded, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Error.OutputType == Void { @@ -284,6 +295,7 @@ public func run( configuration, error: error, preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior, isolation: isolation, body: body ) @@ -303,6 +315,8 @@ public func run( /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: An `ExecutableResult` type containing the return value of the closure. @@ -314,6 +328,7 @@ public func run( platformOptions: PlatformOptions = PlatformOptions(), output: Output, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Output.OutputType == Void { @@ -328,6 +343,7 @@ public func run( configuration, output: output, preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior, isolation: isolation, body: body ) @@ -347,6 +363,8 @@ public func run( /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns: an `ExecutableResult` type containing the return value of the closure. @@ -357,6 +375,7 @@ public func run( workingDirectory: FilePath? = nil, platformOptions: PlatformOptions = PlatformOptions(), preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ( ( @@ -377,6 +396,7 @@ public func run( return try await run( configuration, preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior, isolation: isolation, body: body ) @@ -615,6 +635,8 @@ public func run< /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns an executableResult type containing the return value @@ -628,6 +650,7 @@ public func run< input: Input = .none, error: Error = .discarded, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Error.OutputType == Void { @@ -660,7 +683,8 @@ public func run< // Body runs in the same isolation let outputSequence = AsyncBufferSequence( diskIO: outputIOBox.take()!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior ) let result = try await body(execution, outputSequence) @@ -681,6 +705,8 @@ public func run< /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns an executableResult type containing the return value @@ -690,6 +716,7 @@ public func run( input: Input = .none, output: Output, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Output.OutputType == Void { @@ -719,7 +746,8 @@ public func run( // Body runs in the same isolation let errorSequence = AsyncBufferSequence( diskIO: errorIOBox.take()!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior ) let result = try await body(execution, errorSequence) @@ -740,6 +768,8 @@ public func run( /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns an executableResult type containing the return value @@ -748,6 +778,7 @@ public func run( _ configuration: Configuration, error: Error = .discarded, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Error.OutputType == Void { @@ -765,7 +796,8 @@ public func run( let writer = StandardInputWriter(diskIO: inputIO!) let outputSequence = AsyncBufferSequence( diskIO: outputIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior ) return try await body(execution, writer, outputSequence) @@ -783,6 +815,8 @@ public func run( /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom execution body to manually control the running process /// - Returns an executableResult type containing the return value @@ -791,6 +825,7 @@ public func run( _ configuration: Configuration, output: Output, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ((Execution, StandardInputWriter, AsyncBufferSequence) async throws -> Result) ) async throws -> ExecutionOutcome where Output.OutputType == Void { @@ -805,7 +840,8 @@ public func run( let writer = StandardInputWriter(diskIO: inputIO!) let errorSequence = AsyncBufferSequence( diskIO: errorIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior ) return try await body(execution, writer, errorSequence) } @@ -821,6 +857,8 @@ public func run( /// as the default buffer size. Larger buffer sizes may improve performance for /// subprocesses that produce large amounts of output, while smaller buffer sizes /// may reduce memory usage and improve responsiveness for interactive applications. +/// - streamingBehavior: Controls the trade-off between throughput and latency when +/// streaming output from the subprocess. Defaults to `.throughput`. /// - isolation: the isolation context to run the body closure. /// - body: The custom configuration body to manually control /// the running process, write to its standard input, stream @@ -829,6 +867,7 @@ public func run( public func run( _ configuration: Configuration, preferredBufferSize: Int? = nil, + streamingBehavior: AsyncBufferSequence.StreamingBehavior = .throughput, isolation: isolated (any Actor)? = #isolation, body: ( ( @@ -851,11 +890,13 @@ public func run( let writer = StandardInputWriter(diskIO: inputIO!) let outputSequence = AsyncBufferSequence( diskIO: outputIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior ) let errorSequence = AsyncBufferSequence( diskIO: errorIO!.consumeIOChannel(), - preferredBufferSize: preferredBufferSize + preferredBufferSize: preferredBufferSize, + streamingBehavior: streamingBehavior ) return try await body(execution, writer, outputSequence, errorSequence) } diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 5e9b13d4..4708c8c8 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -34,6 +34,24 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { internal typealias DiskIO = FileDescriptor #endif + /// Controls the trade-off between throughput and latency when streaming + /// output from a subprocess. + public enum StreamingBehavior: Sendable, Hashable { + /// Optimize for throughput by batching data before delivery. + /// Data is delivered when the buffer fills or the stream ends. + /// This provides the best performance for high-volume output. + case throughput + /// Balance throughput and latency by batching data but guaranteeing + /// delivery within a maximum interval (250ms). + /// This is suitable for most interactive use cases. + case balanced + /// Optimize for latency by delivering data as soon as it's available, + /// with guaranteed delivery within 250ms. + /// This provides the most responsive output but may have higher overhead + /// for high-volume streams. + case latency + } + /// Iterator for `AsyncBufferSequence`. @_nonSendable public struct Iterator: AsyncIteratorProtocol { @@ -42,12 +60,20 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { private let diskIO: DiskIO private let preferredBufferSize: Int + private let returnOnFirstData: Bool private var buffer: [Buffer] + #if SUBPROCESS_ASYNCIO_DISPATCH + private let pendingState: PendingReadState? + #endif - internal init(diskIO: DiskIO, preferredBufferSize: Int?) { + internal init(diskIO: DiskIO, preferredBufferSize: Int?, returnOnFirstData: Bool = false) { self.diskIO = diskIO self.buffer = [] self.preferredBufferSize = preferredBufferSize ?? readBufferSize + self.returnOnFirstData = returnOnFirstData + #if SUBPROCESS_ASYNCIO_DISPATCH + self.pendingState = returnOnFirstData ? PendingReadState() : nil + #endif } /// Retrieve the next buffer in the sequence, or `nil` if @@ -58,10 +84,19 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { return self.buffer.removeFirst() } // Read more data + #if SUBPROCESS_ASYNCIO_DISPATCH + let data = try await AsyncIO.shared.read( + from: self.diskIO, + upTo: self.preferredBufferSize, + returnOnFirstData: self.returnOnFirstData, + pendingState: self.pendingState + ) + #else let data = try await AsyncIO.shared.read( from: self.diskIO, upTo: self.preferredBufferSize ) + #endif guard let data else { // We finished reading. Close the file descriptor now #if SUBPROCESS_ASYNCIO_DISPATCH @@ -87,17 +122,52 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable { private let diskIO: DiskIO private let preferredBufferSize: Int? + private let returnOnFirstData: Bool - internal init(diskIO: DiskIO, preferredBufferSize: Int?) { + internal init( + diskIO: DiskIO, + preferredBufferSize: Int?, + streamingBehavior: StreamingBehavior = .throughput + ) { self.diskIO = diskIO self.preferredBufferSize = preferredBufferSize + + #if SUBPROCESS_ASYNCIO_DISPATCH + // Configure DispatchIO based on streaming behavior + switch streamingBehavior { + case .throughput: + // Default behavior - no changes needed + self.returnOnFirstData = false + case .balanced: + // Use default buffer size but guarantee delivery within 250ms + diskIO.setInterval( + interval: .milliseconds(250), + flags: .strictInterval + ) + self.returnOnFirstData = false + case .latency: + // Deliver data as soon as any bytes are available + // Setting lowWater to 0 combined with strictInterval allows + // the handler to be called even with partial data + diskIO.setLimit(lowWater: 0) + // Use a short interval to get data quickly + diskIO.setInterval( + interval: .milliseconds(250), + flags: .strictInterval + ) + self.returnOnFirstData = true + } + #else + self.returnOnFirstData = false + #endif } /// Creates a iterator for this asynchronous sequence. public func makeAsyncIterator() -> Iterator { return Iterator( diskIO: self.diskIO, - preferredBufferSize: self.preferredBufferSize + preferredBufferSize: self.preferredBufferSize, + returnOnFirstData: self.returnOnFirstData ) } diff --git a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift index 30d2ec21..d45295a2 100644 --- a/Sources/Subprocess/IO/AsyncIO+Dispatch.swift +++ b/Sources/Subprocess/IO/AsyncIO+Dispatch.swift @@ -22,6 +22,57 @@ import SystemPackage internal import Dispatch +/// Tracks an in-flight DispatchIO read that was resumed early due to +/// `returnOnFirstData`. After the continuation returns the first chunk, +/// the DispatchIO handler may still deliver more data for the same +/// `read()` call. This actor funnels that data through an AsyncStream so +/// the next iterator `next()` call can retrieve it without starting an +/// overlapping read. +actor PendingReadState { + /// True while a DispatchIO operation is still delivering data after + /// the continuation was resumed early. Accessed nonisolated because + /// it is set from the DispatchIO handler and read from the async caller. + nonisolated(unsafe) private var _hasPendingOperation: Bool = false + + private let asyncStream: AsyncStream + nonisolated private let continuation: AsyncStream.Continuation + + init() { + let (stream, continuation) = AsyncStream.makeStream(of: DispatchData?.self) + self.asyncStream = stream + self.continuation = continuation + } + + var hasPendingOperation: Bool { + _hasPendingOperation + } + + nonisolated func beginPendingOperation() { + _hasPendingOperation = true + } + + nonisolated func append(_ newData: DispatchData) { + continuation.yield(newData) + } + + nonisolated func markOperationDone() { + continuation.yield(nil) + } + + /// Returns the next chunk of pending data, or `nil` when the in-flight + /// operation is complete (caller should issue a new DispatchIO read). + func asyncTakeData() async -> DispatchData? { + for await data in asyncStream { + if let data { + return data + } + _hasPendingOperation = false + return nil + } + return nil + } +} + final class AsyncIO: Sendable { static let shared: AsyncIO = AsyncIO() @@ -41,35 +92,76 @@ final class AsyncIO: Sendable { internal func read( from dispatchIO: DispatchIO, - upTo maxLength: Int + upTo maxLength: Int, + returnOnFirstData: Bool = false, + pendingState: PendingReadState? = nil ) async throws(SubprocessError) -> DispatchData? { + // If a prior DispatchIO read is still delivering data (because we + // returned early on the first chunk), drain from the pending state + // instead of starting a new overlapping read. + if let pendingState, await pendingState.hasPendingOperation { + if let data = await pendingState.asyncTakeData() { + return data + } + // The previous operation completed with no more pending data. + // Fall through to start a fresh read. + } + // https://github.com/swiftlang/swift/issues/87810 return try await _castError { return try await withCheckedThrowingContinuation { continuation in var buffer: DispatchData = .empty + var hasResumed = false + dispatchIO.read( offset: 0, length: maxLength, queue: DispatchQueue(label: "SubprocessReadQueue") ) { done, data, error in + if let data, !data.isEmpty { + if hasResumed, let pendingState { + pendingState.append(data) + } else { + if buffer.isEmpty { + buffer = data + } else { + buffer.append(data) + } + } + } + + if done, hasResumed { + pendingState?.markOperationDone() + return + } + + guard !hasResumed else { + return + } + if error != 0 { + hasResumed = true continuation.resume( - throwing: - SubprocessError + throwing: SubprocessError .failedToReadFromProcess( withUnderlyingError: Errno(rawValue: error) ) ) return } - if let data { - if buffer.isEmpty { - buffer = data - } else { - buffer.append(data) + + if returnOnFirstData && !buffer.isEmpty { + hasResumed = true + pendingState?.beginPendingOperation() + if done { + pendingState?.markOperationDone() } + continuation.resume(returning: buffer) + return } + if done { + hasResumed = true if !buffer.isEmpty { continuation.resume(returning: buffer) } else { diff --git a/Tests/SubprocessTests/StreamingBehaviorTests.swift b/Tests/SubprocessTests/StreamingBehaviorTests.swift new file mode 100644 index 00000000..2090b22b --- /dev/null +++ b/Tests/SubprocessTests/StreamingBehaviorTests.swift @@ -0,0 +1,839 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if canImport(System) +@preconcurrency import System +#else +@preconcurrency import SystemPackage +#endif + +import Testing +import Dispatch +import Foundation +@testable import Subprocess + +@Suite("AsyncBufferSequence.StreamingBehavior Tests", .serialized) +struct StreamingBehaviorTests {} + +// MARK: - StreamingBehavior Enum Tests +extension StreamingBehaviorTests { + @Test func testStreamingBehaviorEquality() { + // Test that all three streaming behavior values are distinct + let throughput: AsyncBufferSequence.StreamingBehavior = .throughput + let balanced: AsyncBufferSequence.StreamingBehavior = .balanced + let latency: AsyncBufferSequence.StreamingBehavior = .latency + + // Each value should be equal to itself + #expect(throughput == .throughput) + #expect(balanced == .balanced) + #expect(latency == .latency) + + // Each value should be different from the others + #expect(throughput != balanced) + #expect(throughput != latency) + #expect(balanced != latency) + } + + @Test func testStreamingBehaviorHashable() { + // Test that streaming behaviors can be used in sets and dictionaries + let behaviors: Set = [ + .throughput, .balanced, .latency + ] + #expect(behaviors.count == 3) + #expect(behaviors.contains(.throughput)) + #expect(behaviors.contains(.balanced)) + #expect(behaviors.contains(.latency)) + } +} + +// MARK: - Throughput Mode Tests +extension StreamingBehaviorTests { + /// Test that throughput mode successfully streams output from a subprocess. + @Test func testThroughputModeBasicFunctionality() async throws { + let expected = "Hello from throughput mode" + #if os(Windows) + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo \(expected)"], + error: .discarded, + streamingBehavior: .throughput + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "echo '\(expected)'"], + error: .discarded, + streamingBehavior: .throughput + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #endif + + #expect(result.terminationStatus.isSuccess) + #expect(result.value == expected) + } + + /// Test that throughput mode can handle larger amounts of data. + @Test func testThroughputModeWithLargeOutput() async throws { + let lineCount = 1000 + #if os(Windows) + let script = (1...lineCount).map { "echo Line \($0)" }.joined(separator: " & ") + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", script], + error: .discarded, + streamingBehavior: .throughput + ) { execution, standardOutput in + var count = 0 + for try await _ in standardOutput.lines() { + count += 1 + } + return count + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "for i in $(seq 1 \(lineCount)); do echo \"Line $i\"; done"], + error: .discarded, + streamingBehavior: .throughput + ) { execution, standardOutput in + var count = 0 + for try await _ in standardOutput.lines() { + count += 1 + } + return count + } + #endif + + #expect(result.terminationStatus.isSuccess) + #expect(result.value == lineCount) + } +} + +// MARK: - Balanced Mode Tests +extension StreamingBehaviorTests { + /// Test that balanced mode successfully streams output from a subprocess. + @Test func testBalancedModeBasicFunctionality() async throws { + let expected = "Hello from balanced mode" + #if os(Windows) + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo \(expected)"], + error: .discarded, + streamingBehavior: .balanced + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "echo '\(expected)'"], + error: .discarded, + streamingBehavior: .balanced + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #endif + + #expect(result.terminationStatus.isSuccess) + #expect(result.value == expected) + } + + /// Test that balanced mode can handle larger amounts of data. + @Test func testBalancedModeWithLargeOutput() async throws { + let lineCount = 1000 + #if os(Windows) + let script = (1...lineCount).map { "echo Line \($0)" }.joined(separator: " & ") + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", script], + error: .discarded, + streamingBehavior: .balanced + ) { execution, standardOutput in + var count = 0 + for try await _ in standardOutput.lines() { + count += 1 + } + return count + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "for i in $(seq 1 \(lineCount)); do echo \"Line $i\"; done"], + error: .discarded, + streamingBehavior: .balanced + ) { execution, standardOutput in + var count = 0 + for try await _ in standardOutput.lines() { + count += 1 + } + return count + } + #endif + + #expect(result.terminationStatus.isSuccess) + #expect(result.value == lineCount) + } + + /// Test that balanced mode can handle multiple lines of data that arrive with delays. + @Test func testBalancedModeWithDelayedOutput() async throws { + #if os(Windows) + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo Line1 & timeout /t 1 >nul & echo Line2"], + error: .discarded, + streamingBehavior: .balanced + ) { execution, standardOutput in + var lines: [String] = [] + for try await line in standardOutput.lines() { + lines.append(line.trimmingNewLineAndQuotes()) + } + return lines + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: [ + "-c", + """ + echo "Line1" + sleep 0.1 + echo "Line2" + sleep 0.1 + echo "Line3" + """, + ], + error: .discarded, + streamingBehavior: .balanced + ) { execution, standardOutput in + var lines: [String] = [] + for try await line in standardOutput.lines() { + lines.append(line.trimmingNewLineAndQuotes()) + } + return lines + } + #endif + + #expect(result.terminationStatus.isSuccess) + #if os(Windows) + #expect(result.value.contains("Line1")) + #expect(result.value.contains("Line2")) + #else + #expect(result.value.count == 3, "Expected 3 lines, got \(result.value.count): \(result.value)") + #expect(result.value[0] == "Line1") + #expect(result.value[1] == "Line2") + #expect(result.value[2] == "Line3") + #endif + } +} + +// MARK: - Latency Mode Tests +extension StreamingBehaviorTests { + /// Test that latency mode successfully streams output from a subprocess. + @Test func testLatencyModeBasicFunctionality() async throws { + let expected = "Hello from latency mode" + #if os(Windows) + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo \(expected)"], + error: .discarded, + streamingBehavior: .latency + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "echo '\(expected)'"], + error: .discarded, + streamingBehavior: .latency + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #endif + + #expect(result.terminationStatus.isSuccess) + #expect(result.value == expected) + } + + /// Test that latency mode can handle multiple lines of data that arrive with delays. + /// This test is more representative of interactive use cases where latency mode shines. + @Test func testLatencyModeWithDelayedOutput() async throws { + #if os(Windows) + // On Windows, use a simpler approach + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo Line1 & timeout /t 1 >nul & echo Line2"], + error: .discarded, + streamingBehavior: .latency + ) { execution, standardOutput in + var lines: [String] = [] + for try await line in standardOutput.lines() { + lines.append(line.trimmingNewLineAndQuotes()) + } + return lines + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: [ + "-c", + """ + echo "Line1" + sleep 0.1 + echo "Line2" + sleep 0.1 + echo "Line3" + """, + ], + error: .discarded, + preferredBufferSize: 1, + streamingBehavior: .latency + ) { execution, standardOutput in + var lines: [String] = [] + for try await line in standardOutput.lines() { + lines.append(line.trimmingNewLineAndQuotes()) + } + return lines + } + #endif + + #expect(result.terminationStatus.isSuccess) + #if os(Windows) + #expect(result.value.contains("Line1")) + #expect(result.value.contains("Line2")) + #else + #expect(result.value.count == 3, "Expected 3 lines, got \(result.value.count): \(result.value)") + #expect(result.value[0] == "Line1") + #expect(result.value[1] == "Line2") + #expect(result.value[2] == "Line3") + #endif + } + + #if !os(Windows) + /// Test that latency mode delivers the first line before a long sleep completes. + /// This simulates a script that outputs a line, sleeps, then outputs a final line. + /// With latency mode, the first line must arrive well before the sleep ends — + /// not buffered until process exit. + @Test func testLatencyModeFirstLineArrivesBeforeLongDelay() async throws { + let start = Date() + var firstLineTimestamp: Date? = nil + + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: [ + "-c", + """ + echo "first" + sleep 1 + echo "last" + """, + ], + error: .discarded, + streamingBehavior: .latency + ) { execution, standardOutput in + var lines: [String] = [] + for try await line in standardOutput.lines() { + if firstLineTimestamp == nil { + firstLineTimestamp = Date() + } + lines.append(line.trimmingCharacters(in: .whitespacesAndNewlines)) + } + return lines + } + + #expect(result.terminationStatus.isSuccess) + #expect(result.value == ["first", "last"]) + + // The first line must arrive well before the 1-second sleep completes. + // If output were buffered until process exit, it would arrive after ~1s. + if let firstLineTimestamp { + let elapsed = firstLineTimestamp.timeIntervalSince(start) + #expect( + elapsed < 0.5, + "First line should arrive within 0.5s, but took \(elapsed)s. Output may be buffered until process exit." + ) + } else { + Issue.record("No lines were received") + } + } + + /// Test that latency mode delivers data quickly for interactive use cases. + /// This test verifies that when a subprocess outputs data incrementally, + /// latency mode receives it without waiting for a full buffer. + @Test func testLatencyModeDeliversDataQuickly() async throws { + // This test outputs a single small line and then waits. + // With latency mode, we should receive the data quickly without + // waiting for the buffer to fill. + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: [ + "-c", + """ + echo "first" + sleep 0.5 + echo "second" + """, + ], + error: .discarded, + preferredBufferSize: 1, + streamingBehavior: .latency + ) { execution, standardOutput in + var lines: [String] = [] + var timestamps: [Date] = [] + + for try await line in standardOutput.lines() { + lines.append(line.trimmingNewLineAndQuotes()) + timestamps.append(Date()) + } + + return (lines: lines, timestamps: timestamps) + } + + #expect(result.terminationStatus.isSuccess) + #expect(result.value.lines.count == 2) + #expect(result.value.lines[0] == "first") + #expect(result.value.lines[1] == "second") + + // Verify that we received the lines at different times + // (the second line should come ~0.5 seconds after the first) + if result.value.timestamps.count == 2 { + let timeDiff = result.value.timestamps[1].timeIntervalSince(result.value.timestamps[0]) + // The time difference should be around 0.5 seconds (allowing some tolerance) + #expect(timeDiff >= 0.3, "Second line should arrive after a delay") + } + } + + /// Test that latency mode receives all subsequent chunks after the first chunk. + /// This test reproduces a bug where subsequent chunks arriving after the first + /// chunk has been returned are ignored (the handler sets hasResumed=true and + /// ignores later data). + /// + /// The bug occurs when DispatchIO calls the handler multiple times for a single + /// read() call. With latency mode and returnOnFirstData=true, the first chunk + /// causes an early return and sets hasResumed=true. If DispatchIO calls the handler + /// again with more data (with done=false) for that same read() operation, that + /// data is lost because hasResumed is already true and the guard at line 62-65 + /// causes the handler to return early, ignoring the data. + /// + /// This test outputs data slowly to allow DispatchIO's interval-based handler + /// to be called multiple times with partial data. With latency mode's strictInterval + /// and lowWater=0, the handler should be called at each interval with whatever + /// data is available. After returning the first chunk, subsequent handler calls + /// should not be ignored. + @Test func testLatencyModeReceivesAllSubsequentChunks() async throws { + // Output data slowly with small writes to trigger DispatchIO's interval-based + // handler calls. With latency mode's 250ms interval and lowWater=0, the handler + // should be called multiple times as data arrives, even for a single read() call. + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: [ + "-c", + """ + printf "A" + sleep 0.05 + printf "B" + sleep 0.05 + printf "C" + sleep 0.05 + printf "D" + sleep 0.05 + printf "E" + """, + ], + error: .discarded, + preferredBufferSize: 100, // Large enough buffer to read all data in one call + streamingBehavior: .latency + ) { execution, standardOutput in + var allData = Data() + var chunkCount = 0 + for try await buffer in standardOutput { + chunkCount += 1 + allData.append(contentsOf: buffer.data) + } + let receivedString = String(decoding: allData, as: UTF8.self) + return (data: receivedString, chunkCount: chunkCount) + } + + #expect(result.terminationStatus.isSuccess) + + // Verify we received all the data. If the bug exists, we might only receive + // the first chunk ("A") and lose subsequent chunks that arrive in handler + // calls after the first chunk was returned. + let expectedOutput = "ABCDE" + #expect( + result.value.data == expectedOutput, + "Expected '\(expectedOutput)', got '\(result.value.data)'. This indicates subsequent chunks after the first were ignored. Received \(result.value.chunkCount) chunks. The bug causes handler calls after the first chunk to be ignored when hasResumed=true." + ) + + // Verify we received at least one chunk + #expect( + result.value.chunkCount > 0, + "Should receive at least one chunk, got \(result.value.chunkCount)" + ) + } + #endif +} + +// MARK: - Comparison Tests +extension StreamingBehaviorTests { + /// Test that all streaming behaviors produce the same output for the same input. + @Test func testAllBehaviorsProduceSameOutput() async throws { + let expected = "Test output content" + + #if os(Windows) + let executable: Subprocess.Executable = .name("cmd.exe") + let arguments: Subprocess.Arguments = ["/c", "echo \(expected)"] + #else + let executable: Subprocess.Executable = .path("/bin/sh") + let arguments: Subprocess.Arguments = ["-c", "echo '\(expected)'"] + #endif + + // Run with throughput mode + let throughputResult = try await Subprocess.run( + executable, + arguments: arguments, + error: .discarded, + streamingBehavior: .throughput + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + + // Run with balanced mode + let balancedResult = try await Subprocess.run( + executable, + arguments: arguments, + error: .discarded, + streamingBehavior: .balanced + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + + // Run with latency mode + let latencyResult = try await Subprocess.run( + executable, + arguments: arguments, + error: .discarded, + streamingBehavior: .latency + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + + // All modes should produce the same output + #expect(throughputResult.terminationStatus.isSuccess) + #expect(balancedResult.terminationStatus.isSuccess) + #expect(latencyResult.terminationStatus.isSuccess) + + #expect(throughputResult.value == expected) + #expect(balancedResult.value == expected) + #expect(latencyResult.value == expected) + } + + /// Test streaming behavior with binary data. + @Test func testStreamingBehaviorWithBinaryData() async throws { + let behaviors: [AsyncBufferSequence.StreamingBehavior] = [.throughput, .balanced, .latency] + + for behavior in behaviors { + #if os(Windows) + // Windows: Use PowerShell to output binary bytes + let result = try await Subprocess.run( + .name("powershell.exe"), + arguments: [ + "-Command", + "[byte[]]@(0x48,0x65,0x6C,0x6C,0x6F) | ForEach-Object { [Console]::OpenStandardOutput().WriteByte($_) }", + ], + error: .discarded, + streamingBehavior: behavior + ) { execution, standardOutput in + var bytes: [UInt8] = [] + for try await buffer in standardOutput { + buffer.withUnsafeBytes { ptr in + bytes.append(contentsOf: ptr) + } + } + return bytes + } + #else + // Unix: Output "Hello" as bytes using printf + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "printf 'Hello'"], + error: .discarded, + streamingBehavior: behavior + ) { execution, standardOutput in + var bytes: [UInt8] = [] + for try await buffer in standardOutput { + buffer.withUnsafeBytes { ptr in + bytes.append(contentsOf: ptr) + } + } + return bytes + } + #endif + + #expect(result.terminationStatus.isSuccess, "Behavior \(behavior) should succeed") + #expect(result.value == Array("Hello".utf8), "Behavior \(behavior) should produce correct output") + } + } +} + +// MARK: - Edge Cases +extension StreamingBehaviorTests { + /// Test streaming behavior with empty output. + @Test func testStreamingBehaviorWithEmptyOutput() async throws { + let behaviors: [AsyncBufferSequence.StreamingBehavior] = [.throughput, .balanced, .latency] + + for behavior in behaviors { + #if os(Windows) + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", ""], + error: .discarded, + streamingBehavior: behavior + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", ""], + error: .discarded, + streamingBehavior: behavior + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output + } + #endif + + #expect(result.terminationStatus.isSuccess, "Behavior \(behavior) should succeed with empty output") + #expect(result.value.isEmpty, "Behavior \(behavior) should produce empty output") + } + } + + /// Test streaming behavior with both stdout and stderr. + @Test func testStreamingBehaviorWithBothStreams() async throws { + let behaviors: [AsyncBufferSequence.StreamingBehavior] = [.throughput, .balanced, .latency] + + for behavior in behaviors { + #if os(Windows) + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo stdout & echo stderr 1>&2"], + streamingBehavior: behavior + ) { execution, inputWriter, standardOutput, standardError in + try await inputWriter.finish() + + async let stdoutTask: String = { + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + }() + + async let stderrTask: String = { + var output = "" + for try await line in standardError.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + }() + + return (stdout: try await stdoutTask, stderr: try await stderrTask) + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "echo stdout; echo stderr 1>&2"], + streamingBehavior: behavior + ) { execution, inputWriter, standardOutput, standardError in + try await inputWriter.finish() + + async let stdoutTask: String = { + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + }() + + async let stderrTask: String = { + var output = "" + for try await line in standardError.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + }() + + return (stdout: try await stdoutTask, stderr: try await stderrTask) + } + #endif + + #expect(result.terminationStatus.isSuccess, "Behavior \(behavior) should succeed") + #expect(result.value.stdout == "stdout", "Behavior \(behavior) should capture stdout") + #expect(result.value.stderr == "stderr", "Behavior \(behavior) should capture stderr") + } + } + + /// Test streaming behavior with a process that terminates quickly. + @Test func testStreamingBehaviorWithQuickTermination() async throws { + let behaviors: [AsyncBufferSequence.StreamingBehavior] = [.throughput, .balanced, .latency] + + for behavior in behaviors { + #if os(Windows) + let result = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "exit 0"], + error: .discarded, + streamingBehavior: behavior + ) { execution, standardOutput in + var hasOutput = false + for try await _ in standardOutput { + hasOutput = true + } + return hasOutput + } + #else + let result = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "exit 0"], + error: .discarded, + streamingBehavior: behavior + ) { execution, standardOutput in + var hasOutput = false + for try await _ in standardOutput { + hasOutput = true + } + return hasOutput + } + #endif + + #expect(result.terminationStatus.isSuccess, "Behavior \(behavior) should succeed") + #expect(!result.value, "Behavior \(behavior) should produce no output") + } + } +} + +// MARK: - Default Behavior Tests +extension StreamingBehaviorTests { + /// Test that the default streaming behavior is .throughput. + @Test func testDefaultStreamingBehaviorIsThroughput() async throws { + let expected = "Default behavior test" + + #if os(Windows) + // Run without specifying streamingBehavior (should use default) + let defaultResult = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo \(expected)"], + error: .discarded + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + + // Run with explicit .throughput + let explicitResult = try await Subprocess.run( + .name("cmd.exe"), + arguments: ["/c", "echo \(expected)"], + error: .discarded, + streamingBehavior: .throughput + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #else + // Run without specifying streamingBehavior (should use default) + let defaultResult = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "echo '\(expected)'"], + error: .discarded + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + + // Run with explicit .throughput + let explicitResult = try await Subprocess.run( + .path("/bin/sh"), + arguments: ["-c", "echo '\(expected)'"], + error: .discarded, + streamingBehavior: .throughput + ) { execution, standardOutput in + var output = "" + for try await line in standardOutput.lines() { + output += line + } + return output.trimmingNewLineAndQuotes() + } + #endif + + // Both should succeed and produce the same output + #expect(defaultResult.terminationStatus.isSuccess) + #expect(explicitResult.terminationStatus.isSuccess) + #expect(defaultResult.value == expected) + #expect(explicitResult.value == expected) + } +} +