Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
/// The element type for the asynchronous sequence.
public typealias Element = Buffer

#if SUBPROCESS_ASYNCIO_DISPATCH
internal typealias DiskIO = DispatchIO
#elseif canImport(WinSDK)
#if canImport(WinSDK)
internal typealias DiskIO = HANDLE
#else
internal typealias DiskIO = FileDescriptor
Expand Down Expand Up @@ -74,9 +72,7 @@ public struct AsyncBufferSequence: AsyncSequence, @unchecked Sendable {
)
guard let data else {
// We finished reading. Close the file descriptor now
#if SUBPROCESS_ASYNCIO_DISPATCH
try _safelyClose(.dispatchIO(self.diskIO))
#elseif canImport(WinSDK)
#if canImport(WinSDK)
try _safelyClose(.handle(self.diskIO))
#else
try _safelyClose(.fileDescriptor(self.diskIO))
Expand Down
28 changes: 2 additions & 26 deletions Sources/Subprocess/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,6 @@ internal enum _CloseTarget {
#else
case fileDescriptor(FileDescriptor)
#endif
case dispatchIO(DispatchIO)
}

internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) {
Expand Down Expand Up @@ -822,8 +821,6 @@ internal func _safelyClose(_ target: _CloseTarget) throws(SubprocessError) {
)
}
#endif
case .dispatchIO(let dispatchIO):
dispatchIO.close()
}
}

Expand Down Expand Up @@ -879,24 +876,7 @@ internal struct IODescriptor: ~Copyable {
consuming func createIOChannel() -> IOChannel {
let shouldClose = self.closeWhenDone
self.closeWhenDone = false
#if SUBPROCESS_ASYNCIO_DISPATCH
// Transferring out the ownership of fileDescriptor means we don't have go close here
let closeFd = self.descriptor
let dispatchIO: DispatchIO = DispatchIO(
type: .stream,
fileDescriptor: self.platformDescriptor(),
queue: .global(),
cleanupHandler: { @Sendable error in
// Close the file descriptor
if shouldClose {
try? closeFd.close()
}
}
)
return IOChannel(dispatchIO, closeWhenDone: shouldClose)
#else
return IOChannel(self.descriptor, closeWhenDone: shouldClose)
#endif
}

internal mutating func safelyClose() throws(SubprocessError) {
Expand Down Expand Up @@ -934,9 +914,7 @@ internal struct IODescriptor: ~Copyable {
}

internal struct IOChannel: ~Copyable, @unchecked Sendable {
#if SUBPROCESS_ASYNCIO_DISPATCH
typealias Channel = DispatchIO
#elseif canImport(WinSDK)
#if canImport(WinSDK)
typealias Channel = HANDLE
#else
typealias Channel = FileDescriptor
Expand All @@ -959,9 +937,7 @@ internal struct IOChannel: ~Copyable, @unchecked Sendable {
}
closeWhenDone = false

#if SUBPROCESS_ASYNCIO_DISPATCH
try _safelyClose(.dispatchIO(self.channel))
#elseif canImport(WinSDK)
#if canImport(WinSDK)
try _safelyClose(.handle(self.channel))
#else
try _safelyClose(.fileDescriptor(self.channel))
Expand Down
134 changes: 85 additions & 49 deletions Sources/Subprocess/IO/AsyncIO+Dispatch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,81 @@ final class AsyncIO: Sendable {
from diskIO: borrowing IOChannel,
upTo maxLength: Int
) async throws(SubprocessError) -> DispatchData? {
return try await self.read(
from: diskIO.channel,
upTo: maxLength,
if maxLength == .max {
return try await self.readAll(from: diskIO, upTo: maxLength)
}
return try await self.readChunk(
fd: diskIO.channel.rawValue,
maxLength: maxLength,
)
}

internal func read(
from dispatchIO: DispatchIO,
from fileDescriptor: FileDescriptor,
upTo maxLength: Int
) async throws(SubprocessError) -> DispatchData? {
if maxLength == .max {
return try await self.readAll(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sure reads like it will break backpressure and will potentially hoover up infinite amounts of bytes into memory. Could you explain how this is safe?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The crucial invariant we need to uphold is that we never read more than a clearly defined number of bytes for every time the user calls the try await outputStream.next() call.

We must also have a test case that proves this. The way I would suggest to write this test it to launch a shell with this program

set -eu

trap '' PIPE

roughly_6mb=$(echo {1..1000000})   # puts 6MB of data in this variable
for f in {1..20}; do
    echo "$f" >> /tmp/asdf
    echo "$roughly_6mb" || exit 2
done
exit 0

Then read a little bit, maybe 1kB or so, then sleep for half a second. After that, validate the following:

  • After the sleep, the subprocess is still running (because the echo "$roughly_6mb" is still blocked)
  • Then cancel the Swift Concurrency task (which will tear down the process)
  • Finally validate that the task exits in the expected way. The expected way would be
    • SIGKILL # if we killed it first
    • or exit code 2 # if we closed the reading end of the pipe first which will make the echo "$roughly_6mb" fail and that will cause it to exit with code 2

fd: fileDescriptor.rawValue,
upTo: maxLength
)
}
return try await self.readChunk(
fd: fileDescriptor.rawValue,
maxLength: maxLength,
)
}

internal func readAll(
from diskIO: borrowing IOChannel,
upTo maxLength: Int
) async throws(SubprocessError) -> DispatchData? {
return try await self.readAll(
fd: diskIO.channel.rawValue,
upTo: maxLength
)
}

private func readAll(
fd: Int32,
upTo maxLength: Int
) async throws(SubprocessError) -> DispatchData? {
var accumulated: DispatchData = .empty
while accumulated.count < maxLength {
if Task.isCancelled {
throw SubprocessError.asyncIOFailed(
reason: "Cancelled",
underlyingError: nil
)
}
let remaining = maxLength == .max ? .max : maxLength - accumulated.count
let chunk: DispatchData? = try await self.readChunk(fd: fd, maxLength: remaining)
guard let chunk else { break }
accumulated.append(chunk)
Copy link
Copy Markdown

@weissi weissi Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break backpressure and therefore may lead to OOM crashes and uncontrollable resource usage if this is ever called with maxLength larger than what we want to tolerate at most.

So I'm very worried about the presence of this function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only place where this can be called is for non-streaming output, where you can't stop the user from potentially setting output to .bytes(limit: .max).

But I agree this could be misleading, let me see if I can refactor a bit more.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay thanks for the explainer, this makes sense. My suggestion would be to be stupidly obvious with the intent readBytesContiguouslyIntoMemoryAsRequestedByUser(userRequestedMaxBytes: Int) or something equally obvious that we don't end up accidentally calling elsewhere

}
return accumulated.isEmpty ? nil : accumulated
}

private func readChunk(
fd: Int32,
maxLength: Int
) async throws(SubprocessError) -> DispatchData? {
// https://github.com/swiftlang/swift/issues/87810
return try await _castError {
return try await withCheckedThrowingContinuation { continuation in
var buffer: DispatchData = .empty
dispatchIO.read(
offset: 0,
length: maxLength,
queue: DispatchQueue(label: "SubprocessReadQueue")
) { done, data, error in
DispatchIO.read(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we cancel such a read? We must be able to cancel our read, killing the child process is not enough. We also need a test for the following scenario:

  1. We run a subprocess (process A) and stream its stdout (i.e. we create a pipe where we inherit the write end into the child)
  2. The subprocess spawns another subprocess (process B) and directly inherits on the pipe end that we provided
  3. Now we cancel our subprocess which will eventually kill process A but B remains alive still has our file descriptor.
  4. The verify the expectations: Despite the fact that our file descriptor's write end is still open (in process B), even if nothing more has been written, we are not hanging.

AsyncProcess has a similar test case (that one tests stdin not stdout, we need both)

    func testWeDoNotHangIfStandardInputRemainsOpenButProcessExits() async throws {
        // This tests an odd situation: The child exits but stdin is still not closed, mostly happens if we inherit a
        // pipe that we still have another writer to.

        var sleepPidToKill: CInt?
        defer {
            if let sleepPidToKill {
                self.logger.debug(
                    "killing our sleep grand-child",
                    metadata: ["pid": "\(sleepPidToKill)"]
                )
                kill(sleepPidToKill, SIGKILL)
            } else {
                XCTFail("didn't find the pid of sleep to kill")
            }
        }
        do { // We create a scope here to make sure we can leave the scope without hanging
            let (stdinStream, stdinStreamProducer) = AsyncStream.makeStream(of: ByteBuffer.self)
            let exe = ProcessExecutor(
                executable: "/bin/sh",
                [
                    "-c",
                #"""
                # This construction attempts to emulate a simple `sleep 12345678 < /dev/null` but some shells (eg. dash)
                # won't allow stdin inheritance for background processes...
                exec 2>&- # close stderr
                exec 2<&0 # duplicate stdin into fd 2 (so we can inherit it into sleep

                ( # this creates a child of our child which gets inherited our fds
                    exec 0<&2  # map the duplicated fd 2 as our stdin
                    exec 2>&-  # close the duplicated fd2
                    exec sleep 12345678 # sleep (this will now have the origin stdin as its stdin)
                ) & # uber long sleep that will inherit our stdin pipe
                exec 2>&- # close duplicated 2

                read -r line
                echo "$line" # write back the line
                echo "$!" # write back the sleep
                exec >&-
                exit 0
                """#
                ],
                standardInput: stdinStream
            )
            stdinStreamProducer.yield(ByteBuffer(string: "GO\n"))
            stdinStreamProducer.yield(ByteBuffer(repeating: 0x42, count: 16 * 1024 * 1024))
            async let resultAsync = exe.runWithExtendedInfo()
            async let stderrAsync = Array(exe.standardError)
            var stdoutLines = await exe.standardOutput.splitIntoLines().makeAsyncIterator()
            let lineGo = try await stdoutLines.next()
            XCTAssertEqual(ByteBuffer(string: "GO"), lineGo)
            let linePid = try await stdoutLines.next().map(String.init(buffer:))
            let sleepPid = try XCTUnwrap(linePid.flatMap { CInt($0) })
            self.logger.debug("found our sleep grand-child", metadata: ["pid": "\(sleepPid)"])
            sleepPidToKill = sleepPid
            let stderrBytes = try await stderrAsync
            XCTAssertEqual([], stderrBytes)
            let result = try await resultAsync
            XCTAssertEqual(.exit(0), result.exitReason)
            XCTAssertNotNil(result.standardInputWriteError)
            XCTAssertEqual(ChannelError.ioOnClosedChannel, result.standardInputWriteError as? ChannelError)
            stdinStreamProducer.finish()
        }
    }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancellation wasn't handled previously. Let's tackle that in a separate PR.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, fair enough. @jakepetroules would you be able to make a fresh bug detailing the places where we don't handle cancellation which would break under the scenario? If this is a pre-existing issue I'm happy moving forward assuming we have an actual bug for that.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakepetroules Is this about right? #256

fromFileDescriptor: fd,
maxLength: maxLength,
runningHandlerOn: .global()
) { data, error in
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closure will be invoked exactly once, is that correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

if error != 0 {
continuation.resume(
throwing:
SubprocessError
.failedToReadFromProcess(
withUnderlyingError: Errno(rawValue: error)
)
)
throwing: SubprocessError.failedToReadFromProcess(
withUnderlyingError: Errno(rawValue: error)
))
return
}
if let data {
if buffer.isEmpty {
buffer = data
} else {
buffer.append(data)
}
}
if done {
if !buffer.isEmpty {
continuation.resume(returning: buffer)
} else {
continuation.resume(returning: nil)
}
}
continuation.resume(returning: data.isEmpty ? nil : data)
}
}
}
Expand Down Expand Up @@ -147,27 +179,31 @@ final class AsyncIO: Sendable {
queue: DispatchQueue = .global(),
completion: @escaping (Int, SubprocessError?) -> Void
) {
diskIO.channel.write(
offset: 0,
data: dispatchData,
queue: queue
) { done, unwritten, error in
guard done else {
// Wait until we are done writing or encountered some error
return
}

let unwrittenLength = unwritten?.count ?? 0
let writtenLength = dispatchData.count - unwrittenLength
guard error != 0 else {
completion(writtenLength, nil)
return
let fd = diskIO.channel.rawValue
var totalWritten = 0
func writeRemaining(_ data: DispatchData) {
DispatchIO.write(
toFileDescriptor: fd,
data: data,
runningHandlerOn: queue
) { unwritten, error in
let unwrittenLength = unwritten?.count ?? 0
totalWritten += data.count - unwrittenLength
if error != 0 {
completion(
totalWritten,
.failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error))
)
return
}
if let unwritten, !unwritten.isEmpty {
writeRemaining(unwritten)
} else {
completion(totalWritten, nil)
}
}
completion(
writtenLength,
.failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error))
)
}
writeRemaining(dispatchData)
}
}

Expand Down
51 changes: 49 additions & 2 deletions Sources/Subprocess/IO/AsyncIO+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,55 @@ extension AsyncIO {
from diskIO: borrowing IOChannel,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self.read(from: diskIO.channel, upTo: maxLength)
if maxLength == .max {
return try await self._readAll(from: diskIO.channel, upTo: maxLength)
}
return try await self._readChunk(from: diskIO.channel, upTo: maxLength)
}

func read(
from fileDescriptor: FileDescriptor,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
if maxLength == .max {
return try await self._readAll(from: fileDescriptor, upTo: maxLength)
}
return try await self._readChunk(from: fileDescriptor, upTo: maxLength)
}

func readAll(
from diskIO: borrowing IOChannel,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self._readAll(from: diskIO.channel, upTo: maxLength)
}

private func _readChunk(
from fileDescriptor: FileDescriptor,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self._read(
from: fileDescriptor,
upTo: maxLength,
returnOnFirstRead: true
)
}

private func _readAll(
from fileDescriptor: FileDescriptor,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self._read(
from: fileDescriptor,
upTo: maxLength,
returnOnFirstRead: false
)
}

private func _read(
from fileDescriptor: FileDescriptor,
upTo maxLength: Int,
returnOnFirstRead: Bool
) async throws(SubprocessError) -> [UInt8]? {
guard maxLength > 0 else {
return nil
Expand Down Expand Up @@ -375,7 +418,11 @@ extension AsyncIO {
if bytesRead > 0 {
// Read some data
readLength += bytesRead
if maxLength == .max {
if returnOnFirstRead {
try self.removeRegistration(for: fileDescriptor)
resultBuffer.removeLast(resultBuffer.count - readLength)
return resultBuffer
} else if maxLength == .max {
// Grow resultBuffer if needed
guard Double(readLength) > 0.8 * Double(resultBuffer.count) else {
continue
Expand Down
50 changes: 48 additions & 2 deletions Sources/Subprocess/IO/AsyncIO+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,55 @@ final class AsyncIO: @unchecked Sendable {
from diskIO: borrowing IOChannel,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self.read(from: diskIO.channel, upTo: maxLength)
if maxLength == .max {
return try await self._readAll(from: diskIO.channel, upTo: maxLength)
}
return try await self._readChunk(from: diskIO.channel, upTo: maxLength)
}

func read(
from handle: HANDLE,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
if maxLength == .max {
return try await self._readAll(from: handle, upTo: maxLength)
}
return try await self._readChunk(from: handle, upTo: maxLength)
}

func readAll(
from diskIO: borrowing IOChannel,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self._readAll(from: diskIO.channel, upTo: maxLength)
}

private func _readChunk(
from handle: HANDLE,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self._read(
from: handle,
upTo: maxLength,
returnOnFirstRead: true
)
}

private func _readAll(
from handle: HANDLE,
upTo maxLength: Int
) async throws(SubprocessError) -> [UInt8]? {
return try await self._read(
from: handle,
upTo: maxLength,
returnOnFirstRead: false
)
}

private func _read(
from handle: HANDLE,
upTo maxLength: Int,
returnOnFirstRead: Bool
) async throws(SubprocessError) -> [UInt8]? {
guard maxLength > 0 else {
return nil
Expand Down Expand Up @@ -333,7 +376,10 @@ final class AsyncIO: @unchecked Sendable {
} else {
// Read some data
readLength += Int(truncatingIfNeeded: bytesRead)
if maxLength == .max {
if returnOnFirstRead {
resultBuffer.removeLast(resultBuffer.count - readLength)
return resultBuffer
} else if maxLength == .max {
// Grow resultBuffer if needed
guard Double(readLength) > 0.8 * Double(resultBuffer.count) else {
continue
Expand Down
Loading
Loading