Add StreamingBehavior to AsyncBufferSequence for throughput/latency control#228
Add StreamingBehavior to AsyncBufferSequence for throughput/latency control#228dannys42 wants to merge 3 commits into
Conversation
|
Hi @dannys42, thanks for the PR! I do like the idea of
|
|
Thanks @iCharlesHu! I can try to take a look at Linux, but I don't really have a way of testing Windows.
|
|
Since #259 was merged to rewrite the I/O backends and resolve the correctness issues with buffering, I think this change is likely no longer relevant - I don't think we have a realistic throughput/latency tradeoff to make. @dannys42 given the current state of the repository do you think you still have an enhancement to propose here, or could this be closed? |
|
Oh cool. Let me try out the new version and see if it resolves this issue. |
|
@jakepetroules Trying 0.4, I still end up having to put Here's a simple test program: import Foundation
import Subprocess
@main
struct ExampleApp {
static func main() async throws {
let target = CommandLine.arguments.count > 1
? CommandLine.arguments[1]
: "./test_timing"
let stdout = FileHandle.standardOutput
let result = try await Subprocess.run(
.path(.init(target)),
preferredBufferSize: 1 // <--- comment out this line to see the problem
) { _, outputSequence in
for try await line in outputSequence.lines() {
stdout.write(Data(line.utf8))
}
}
print("exit:", result.terminationStatus)
}
}Also compile this as #include <stdio.h>
#include <unistd.h>
#include <time.h>
void print_time(const char* label) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
fprintf(stderr, "[%s] %ld.%09ld\n", label, ts.tv_sec, ts.tv_nsec);
fflush(stderr);
}
int main(void) {
print_time("start");
fprintf(stdout, "line 1\n");
fflush(stdout);
print_time("after line 1");
sleep(1);
fprintf(stdout, "line 2\n");
fflush(stdout);
print_time("after line 2");
sleep(2);
fprintf(stdout, "line 3\n");
fflush(stdout);
print_time("after line 3");
return 0;
} |
|
Oh wait... I just realized the fix you mentioned is in |
|
Yes, the fix seems to resolve the issue for me. Thank you! |
Motivation
When streaming subprocess output, users previously had no control over the trade-off between throughput and latency. The sequence always batched data for maximum throughput, which limits use for interactive cases where users want to receive output as soon as it's available.
Changes
AsyncBufferSequence.StreamingBehaviorenum with three modes:.throughput(default): batch data for best performance (preserves the existing behavior).balanced: batch data with guaranteed 250ms max delivery interval.latency: deliver data as soon as it's available (also bounded by 250ms)streamingBehaviorparameter to allrunoverloads that expose anAsyncBufferSequence, defaulting to.throughputfor backwards compatibility.latencyand.balancedbehaviors