Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 60 additions & 40 deletions Sources/SwiftCommand/FileHandle+Async.swift
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import Foundation

fileprivate final actor BufferActor {
private let buffer: UnsafeMutableRawBufferPointer

init(buffer: UnsafeMutableRawBufferPointer) {
self.buffer = buffer
}
}

fileprivate final actor IOActor {
#if !os(Windows)
#if !os(Windows)
fileprivate func read(
from fd: Int32,
into buffer: UnsafeMutableRawBufferPointer
) async throws -> Int {
while true {
#if canImport(Darwin)
#if canImport(Darwin)
let read = Darwin.read
#elseif canImport(Glibc)
#elseif canImport(Glibc)
let read = Glibc.read
#else
#error("Unsupported platform!")
#endif
#else
#error("Unsupported platform!")
#endif
let amount = read(fd, buffer.baseAddress, buffer.count)
if amount >= 0 {
return amount
Expand All @@ -28,44 +36,46 @@ fileprivate final actor IOActor {
}
}
}
#endif
#endif

private func _read(
from handle: FileHandle,
upToCount count: Int
) async throws -> Data? {
if #available(macOS 10.15.4, *) {
try? handle.read(upToCount: count)
} else {
handle.readData(ofLength: count)
}
}

fileprivate func read(
from handle: FileHandle,
into buffer: UnsafeMutableRawBufferPointer
) async throws -> Int {
upToCount count: Int
) async throws -> Data? {
try await withUnsafeThrowingContinuation { continuation in
handle.readabilityHandler = { handle in
handle.readabilityHandler = nil

let data: Data
if #available(macOS 10.15.4, *) {
Task.init {
do {
guard let _data =
try handle.read(upToCount: buffer.count) else {
continuation.resume(returning: 0)
return
}

data = _data
continuation.resume(
returning: try await self._read(
from: handle,
upToCount: count
)
)
} catch {
continuation.resume(throwing: error)
return
}
} else {
data = handle.readData(ofLength: buffer.count)
}

data.copyBytes(to: buffer)
continuation.resume(returning: data.count)
}
}
}

fileprivate static let `default` = IOActor()
}


@usableFromInline
internal struct _AsyncBytesBuffer {
private struct Header {
Expand Down Expand Up @@ -94,9 +104,10 @@ internal struct _AsyncBytesBuffer {
}

fileprivate var baseAddress: UnsafeMutableRawPointer {
(self.storage as! Storage).withUnsafeMutablePointerToElements {
.init($0)
}
(self.storage as! Storage)
.withUnsafeMutablePointerToElements {
.init($0)
}
}

fileprivate var capacity: Int {
Expand Down Expand Up @@ -174,9 +185,9 @@ extension FileHandle {
fileprivate init(file: FileHandle) {
self._buffer = _AsyncBytesBuffer(_capacity: Self.bufferSize)

#if !os(Windows)
#if !os(Windows)
let fileDescriptor = file.fileDescriptor
#endif
#endif

self._buffer.readFunction = { buf in
buf._nextPointer = buf.baseAddress
Expand All @@ -188,25 +199,34 @@ extension FileHandle {
count: capacity
)

#if os(Windows)
let readSize = try await IOActor.default.read(
#if os(Windows)
let readSize: Int
if let data = try await IOActor.default.read(
from: file,
into: bufPtr
)
#else
upToCount: bufPtr.count
) {
data.copyBytes(to: bufPtr)
readSize = data.count
} else {
readSize = 0
}
#else
let readSize: Int
if fileDescriptor >= 0 {
readSize = try await IOActor.default.read(
from: fileDescriptor,
into: bufPtr
)
} else if let data = try await IOActor.default.read(
from: file,
upToCount: bufPtr.count
) {
data.copyBytes(to: bufPtr)
readSize = data.count
} else {
readSize = try await IOActor.default.read(
from: file,
into: bufPtr
)
readSize = 0
}
#endif
#endif

buf._endPointer = buf._nextPointer + readSize
return readSize
Expand Down