-
Notifications
You must be signed in to change notification settings - Fork 55
Fix streaming read hangs by replacing persistent DispatchIO with per-read dispatch_read #254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,49 +33,81 @@ final class AsyncIO: Sendable { | |
| from diskIO: borrowing IOChannel, | ||
| upTo maxLength: Int | ||
| ) async throws(SubprocessError) -> DispatchData? { | ||
| return try await self.read( | ||
| from: diskIO.channel, | ||
| upTo: maxLength, | ||
| if maxLength == .max { | ||
| return try await self.readAll(from: diskIO, upTo: maxLength) | ||
| } | ||
| return try await self.readChunk( | ||
| fd: diskIO.channel.rawValue, | ||
| maxLength: maxLength, | ||
| ) | ||
| } | ||
|
|
||
| internal func read( | ||
| from dispatchIO: DispatchIO, | ||
| from fileDescriptor: FileDescriptor, | ||
| upTo maxLength: Int | ||
| ) async throws(SubprocessError) -> DispatchData? { | ||
| if maxLength == .max { | ||
| return try await self.readAll( | ||
| fd: fileDescriptor.rawValue, | ||
| upTo: maxLength | ||
| ) | ||
| } | ||
| return try await self.readChunk( | ||
| fd: fileDescriptor.rawValue, | ||
| maxLength: maxLength, | ||
| ) | ||
| } | ||
|
|
||
| internal func readAll( | ||
| from diskIO: borrowing IOChannel, | ||
| upTo maxLength: Int | ||
| ) async throws(SubprocessError) -> DispatchData? { | ||
| return try await self.readAll( | ||
| fd: diskIO.channel.rawValue, | ||
| upTo: maxLength | ||
| ) | ||
| } | ||
|
|
||
| private func readAll( | ||
| fd: Int32, | ||
| upTo maxLength: Int | ||
| ) async throws(SubprocessError) -> DispatchData? { | ||
| var accumulated: DispatchData = .empty | ||
| while accumulated.count < maxLength { | ||
| if Task.isCancelled { | ||
| throw SubprocessError.asyncIOFailed( | ||
| reason: "Cancelled", | ||
| underlyingError: nil | ||
| ) | ||
| } | ||
| let remaining = maxLength == .max ? .max : maxLength - accumulated.count | ||
| let chunk: DispatchData? = try await self.readChunk(fd: fd, maxLength: remaining) | ||
| guard let chunk else { break } | ||
| accumulated.append(chunk) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will break backpressure and therefore may lead to OOM crashes and uncontrollable resource usage if this is ever called with So I'm very worried about the presence of this function.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the only place where this can be called is for non-streaming output, where you can't stop the user from potentially setting output to But I agree this could be misleading, let me see if I can refactor a bit more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay thanks for the explainer, this makes sense. My suggestion would be to be stupidly obvious with the intent |
||
| } | ||
| return accumulated.isEmpty ? nil : accumulated | ||
| } | ||
|
|
||
| private func readChunk( | ||
| fd: Int32, | ||
| maxLength: Int | ||
| ) async throws(SubprocessError) -> DispatchData? { | ||
| // https://github.com/swiftlang/swift/issues/87810 | ||
| return try await _castError { | ||
| return try await withCheckedThrowingContinuation { continuation in | ||
| var buffer: DispatchData = .empty | ||
| dispatchIO.read( | ||
| offset: 0, | ||
| length: maxLength, | ||
| queue: DispatchQueue(label: "SubprocessReadQueue") | ||
| ) { done, data, error in | ||
| DispatchIO.read( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we cancel such a read? We must be able to cancel our read, killing the child process is not enough. We also need a test for the following scenario:
AsyncProcess has a similar test case (that one tests stdin not stdout, we need both)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cancellation wasn't handled previously. Let's tackle that in a separate PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, fair enough. @jakepetroules would you be able to make a fresh bug detailing the places where we don't handle cancellation which would break under the scenario? If this is a pre-existing issue I'm happy moving forward assuming we have an actual bug for that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jakepetroules Is this about right? #256 |
||
| fromFileDescriptor: fd, | ||
| maxLength: maxLength, | ||
| runningHandlerOn: .global() | ||
| ) { data, error in | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This closure will be invoked exactly once, is that correct?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
| if error != 0 { | ||
| continuation.resume( | ||
| throwing: | ||
| SubprocessError | ||
| .failedToReadFromProcess( | ||
| withUnderlyingError: Errno(rawValue: error) | ||
| ) | ||
| ) | ||
| throwing: SubprocessError.failedToReadFromProcess( | ||
| withUnderlyingError: Errno(rawValue: error) | ||
| )) | ||
| return | ||
| } | ||
| if let data { | ||
| if buffer.isEmpty { | ||
| buffer = data | ||
| } else { | ||
| buffer.append(data) | ||
| } | ||
| } | ||
| if done { | ||
| if !buffer.isEmpty { | ||
| continuation.resume(returning: buffer) | ||
| } else { | ||
| continuation.resume(returning: nil) | ||
| } | ||
| } | ||
| continuation.resume(returning: data.isEmpty ? nil : data) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -147,27 +179,31 @@ final class AsyncIO: Sendable { | |
| queue: DispatchQueue = .global(), | ||
| completion: @escaping (Int, SubprocessError?) -> Void | ||
| ) { | ||
| diskIO.channel.write( | ||
| offset: 0, | ||
| data: dispatchData, | ||
| queue: queue | ||
| ) { done, unwritten, error in | ||
| guard done else { | ||
| // Wait until we are done writing or encountered some error | ||
| return | ||
| } | ||
|
|
||
| let unwrittenLength = unwritten?.count ?? 0 | ||
| let writtenLength = dispatchData.count - unwrittenLength | ||
| guard error != 0 else { | ||
| completion(writtenLength, nil) | ||
| return | ||
| let fd = diskIO.channel.rawValue | ||
| var totalWritten = 0 | ||
| func writeRemaining(_ data: DispatchData) { | ||
| DispatchIO.write( | ||
| toFileDescriptor: fd, | ||
| data: data, | ||
| runningHandlerOn: queue | ||
| ) { unwritten, error in | ||
| let unwrittenLength = unwritten?.count ?? 0 | ||
| totalWritten += data.count - unwrittenLength | ||
| if error != 0 { | ||
| completion( | ||
| totalWritten, | ||
| .failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error)) | ||
| ) | ||
| return | ||
| } | ||
| if let unwritten, !unwritten.isEmpty { | ||
| writeRemaining(unwritten) | ||
| } else { | ||
| completion(totalWritten, nil) | ||
| } | ||
| } | ||
| completion( | ||
| writtenLength, | ||
| .failedToWriteToProcess(withUnderlyingError: Errno(rawValue: error)) | ||
| ) | ||
| } | ||
| writeRemaining(dispatchData) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sure reads like it will break backpressure and will potentially hoover up infinite amounts of bytes into memory. Could you explain how this is safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The crucial invariant we need to uphold is that we never read more than a clearly defined number of bytes for every time the user calls the
try await outputStream.next()call.We must also have a test case that proves this. The way I would suggest to write this test it to launch a shell with this program
Then read a little bit, maybe 1kB or so, then sleep for half a second. After that, validate the following:
echo "$roughly_6mb"is still blocked)echo "$roughly_6mb"fail and that will cause it to exit with code 2