Skip to content
28 changes: 13 additions & 15 deletions Examples/ExampleMiddleware/HTTPServerLoggingMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ where
public struct RequestBodyAsyncReader: AsyncReader, ~Copyable {
public typealias ReadElement = UInt8
public typealias ReadFailure = Base.Underlying.ReadFailure
public typealias Buffer = Base.Underlying.Buffer

private var underlying: Base.Underlying
private let logger: Logger
Expand All @@ -160,16 +161,13 @@ where
self.logger = logger
}

public mutating func read<Return, Failure>(
maximumCount: Int?,
body: (consuming Span<UInt8>) async throws(Failure) -> Return
public mutating func read<Return: ~Copyable, Failure>(
body: (inout Buffer) async throws(Failure) -> Return
) async throws(EitherError<Base.Underlying.ReadFailure, Failure>) -> Return {
let logger = self.logger
return try await self.underlying.read(
maximumCount: maximumCount
) { (span: Span<UInt8>) async throws(Failure) -> Return in
logger.info("Received next chunk \(span.count)")
return try await body(span)
return try await self.underlying.read { (buffer: inout Buffer) async throws(Failure) -> Return in
logger.info("Received next chunk \(buffer.count)")
return try await body(&buffer)
}
}
}
Expand Down Expand Up @@ -219,6 +217,7 @@ where
public struct ResponseBodyAsyncWriter: AsyncWriter, ~Copyable {
public typealias WriteElement = UInt8
public typealias WriteFailure = Base.Underlying.WriteFailure
public typealias Buffer = Base.Underlying.Buffer

private var underlying: Base.Underlying
private let logger: Logger
Expand All @@ -228,14 +227,13 @@ where
self.logger = logger
}

public mutating func write<Result, Failure>(
_ body: (inout OutputSpan<UInt8>) async throws(Failure) -> Result
public mutating func write<Result: ~Copyable, Failure>(
_ body: (inout Buffer) async throws(Failure) -> Result
) async throws(EitherError<Base.Underlying.WriteFailure, Failure>) -> Result {
return try await self.underlying.write { (outputSpan: inout OutputSpan<UInt8>) async throws(Failure) -> Result in
defer {
self.logger.info("Wrote response bytes \(outputSpan.count)")
}
return try await body(&outputSpan)
return try await self.underlying.write { (buffer: inout Buffer) async throws(Failure) -> Result in
let result = try await body(&buffer)
self.logger.info("Wrote response bytes \(buffer.count)")
return result
}
}
}
Expand Down
37 changes: 34 additions & 3 deletions Examples/ProxyServer/ProxyServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ struct ProxyServer {
}

static func proxy(server: some HTTPServer, client: some HTTPClient) async throws {
try await server.serve { request, requestContext, serverRequestBodyAndTrailers, responseSender in
try await server.serve {
request,
requestContext,
serverRequestBodyAndTrailers,
responseSender in
// We need to use a mutex here to move the requestBodyAndTrailers into the
// @Sendable restartable body
let serverRequestBodyAndTrailers = Mutex(Optional(serverRequestBodyAndTrailers))
let serverRequestBodyAndTrailers = Mutex(Disconnected(value: Optional(serverRequestBodyAndTrailers)))
// Needed since we are lacking call-once closures
var responseSender = Optional(responseSender)

Expand All @@ -41,7 +45,9 @@ struct ProxyServer {
var clientRequestBody = clientRequestBody
// This takes the request body out of the mutex. Any restarts would hit
// a force-unwrap.
let serverRequestBodyAndTrailers = serverRequestBodyAndTrailers.withLock { $0.take()! }
let serverRequestBodyAndTrailers = serverRequestBodyAndTrailers.withLock {
$0.swap(newValue: nil)
}!

return try await serverRequestBodyAndTrailers.consumeAndConclude { serverRequestBody in
try await clientRequestBody.write(serverRequestBody)
Expand All @@ -62,3 +68,28 @@ struct ProxyServer {
}
}
}

@usableFromInline
struct Disconnected<Value: ~Copyable>: ~Copyable, Sendable {
// This is safe since we take the value as sending and take consumes it
// and returns it as sending.
private nonisolated(unsafe) var value: Value?

@usableFromInline
init(value: consuming sending Value) {
unsafe self.value = .some(value)
}

@usableFromInline
consuming func take() -> sending Value {
nonisolated(unsafe) let value = unsafe self.value.take()!
return unsafe value
}

@usableFromInline
mutating func swap(newValue: consuming sending Value) -> sending Value {
nonisolated(unsafe) let value = unsafe self.value.take()!
unsafe self.value = consume newValue
return unsafe value
}
}
20 changes: 7 additions & 13 deletions Examples/WASMClient/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//===----------------------------------------------------------------------===//

import AsyncStreaming
import BasicContainers
import ContainersPreview
import FetchHTTPClient
import Foundation
import HTTPAPIs
Expand Down Expand Up @@ -91,20 +93,12 @@ do {

var reader = reader
status.set("⏳ Read \(bytes.count) bytes")
while true {
let shouldContinue = try await reader.read(maximumCount: nil) { span in
if span.isEmpty {
return false
}
for i in span.indices {
bytes.append(span[i])
}
status.set("⏳ Read \(bytes.count) bytes")
return true
}
if !shouldContinue {
break
try await reader.forEachBuffer { buffer in
var consumer = buffer.consumeAll()
while let b = consumer.next() {
bytes.append(b)
}
status.set("⏳ Read \(bytes.count) bytes")
}
return bytes
}
Expand Down
40 changes: 13 additions & 27 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ let package = Package(
.library(name: "HTTPClient", targets: ["HTTPClient"]),
.library(name: "URLSessionHTTPClient", targets: ["URLSessionHTTPClient"]),
.library(name: "AHCHTTPClient", targets: ["AHCHTTPClient"]),
.library(name: "AsyncStreaming", targets: ["AsyncStreaming"]),
.library(name: "NetworkTypes", targets: ["NetworkTypes"]),
.library(name: "Middleware", targets: ["Middleware"]),
.library(name: "HTTPClientConformance", targets: ["HTTPClientConformance"]),
Expand All @@ -40,12 +39,13 @@ let package = Package(
],
dependencies: [
.package(
url: "https://github.com/FranzBusch/swift-collections.git",
branch: "fb-async"
url: "https://github.com/apple/swift-collections.git",
from: "1.5.0"
),
.package(
url: "https://github.com/apple/swift-async-algorithms.git",
from: "1.1.2"
revision: "d0b4a06d0f173a2f3be27d3ea21b3c3aa18db440",
traits: ["UnstableAsyncStreaming"]
),
.package(url: "https://github.com/apple/swift-http-types.git", from: "1.5.1"),
.package(url: "https://github.com/apple/swift-certificates.git", from: "1.16.0"),
Expand All @@ -62,7 +62,7 @@ let package = Package(
.target(
name: "HTTPAPIs",
dependencies: [
"AsyncStreaming",
.product(name: "AsyncStreaming", package: "swift-async-algorithms"),
"NetworkTypes",
.product(name: "HTTPTypes", package: "swift-http-types"),
],
Expand All @@ -81,16 +81,6 @@ let package = Package(
name: "NetworkTypes",
swiftSettings: extraSettings
),
.target(
name: "AsyncStreaming",
dependencies: [
.product(
name: "BasicContainers",
package: "swift-collections"
)
],
swiftSettings: extraSettings
),
.target(
name: "Middleware",
swiftSettings: extraSettings
Expand All @@ -99,7 +89,7 @@ let package = Package(
name: "AHCHTTPClient",
dependencies: [
"HTTPAPIs",
"AsyncStreaming",
.product(name: "AsyncStreaming", package: "swift-async-algorithms"),
"NetworkTypes",
.product(name: "HTTPTypes", package: "swift-http-types"),
.product(name: "HTTPTypesFoundation", package: "swift-http-types"),
Expand All @@ -113,7 +103,7 @@ let package = Package(
name: "URLSessionHTTPClient",
dependencies: [
"HTTPAPIs",
"AsyncStreaming",
.product(name: "AsyncStreaming", package: "swift-async-algorithms"),
"NetworkTypes",
.product(name: "HTTPTypes", package: "swift-http-types"),
.product(name: "HTTPTypesFoundation", package: "swift-http-types"),
Expand All @@ -129,7 +119,7 @@ let package = Package(
"HTTPClient",
// These dependencies are needed by the `swift-http-server` that
// we borrowed.
"AsyncStreaming",
.product(name: "AsyncStreaming", package: "swift-async-algorithms"),
.product(name: "DequeModule", package: "swift-collections"),
.product(name: "BasicContainers", package: "swift-collections"),
.product(name: "X509", package: "swift-certificates"),
Expand Down Expand Up @@ -161,18 +151,11 @@ let package = Package(
],
swiftSettings: extraSettings
),
.testTarget(
name: "AsyncStreamingTests",
dependencies: [
"AsyncStreaming"
],
swiftSettings: extraSettings
),
.testTarget(
name: "HTTPAPIsTests",
dependencies: [
"HTTPAPIs",
"AsyncStreaming",
.product(name: "AsyncStreaming", package: "swift-async-algorithms"),
"NetworkTypes",
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
],
Expand Down Expand Up @@ -287,7 +270,8 @@ if enableWASM {
name: "FetchHTTPClient",
dependencies: [
"HTTPAPIs",
"AsyncStreaming",
.product(name: "AsyncStreaming", package: "swift-async-algorithms"),
.product(name: "BasicContainers", package: "swift-collections"),
.product(name: "HTTPTypes", package: "swift-http-types"),
.product(
name: "JavaScriptKit",
Expand All @@ -306,6 +290,8 @@ if enableWASM {
name: "WASMClient",
dependencies: [
"FetchHTTPClient",
.product(name: "BasicContainers", package: "swift-collections"),
.product(name: "ContainersPreview", package: "swift-collections"),
.product(
name: "JavaScriptKit",
package: "JavaScriptKit",
Expand Down
Loading
Loading