Skip to content

Add StreamingBehavior to AsyncBufferSequence for throughput/latency control#228

Closed
dannys42 wants to merge 3 commits into
swiftlang:mainfrom
dannys42-contrib:FEATURE-202602-streaming_behavior
Closed

Add StreamingBehavior to AsyncBufferSequence for throughput/latency control#228
dannys42 wants to merge 3 commits into
swiftlang:mainfrom
dannys42-contrib:FEATURE-202602-streaming_behavior

Conversation

@dannys42
Copy link
Copy Markdown

Motivation

When streaming subprocess output, users previously had no control over the trade-off between throughput and latency. The sequence always batched data for maximum throughput, which limits use for interactive cases where users want to receive output as soon as it's available.

Changes

  • Add AsyncBufferSequence.StreamingBehavior enum with three modes:
    • .throughput (default): batch data for best performance (preserves the existing behavior)
    • .balanced: batch data with guaranteed 250ms max delivery interval
    • .latency: deliver data as soon as it's available (also bounded by 250ms)
  • Add streamingBehavior parameter to all run overloads that expose an AsyncBufferSequence, defaulting to .throughput for backwards compatibility
  • Add unit tests verifying that data is delivered progressively under .latency and .balanced behaviors

@dannys42 dannys42 requested a review from iCharlesHu as a code owner March 20, 2026 23:36
@iCharlesHu
Copy link
Copy Markdown
Contributor

Hi @dannys42, thanks for the PR! I do like the idea of StreamingBehavior, and I have a few questions:

  • How does it work with the existing preferredBufferSize parameter? Are you proposing we replace it? preferredBufferSize was introduced to serve a similar purpose. Developers can increase or decrease the buffer size based on how frequently they want the data to be delivered.
  • Subprocess is a cross-platform package. I see in your implementation you only included the Dispatch implementation. Can you make sure this proposed API can be implemented on Windows and Linux as well? We don't want to have platform-specific APIs outside of PlatformOptions, and StreamingBehavior doesn't belong in PlatformOptions. Can the notion of batch data with guaranteed 250ms max delivery interval be efficiently implemented on top of Linux epoll and Windows IO Completion Ports?

@dannys42
Copy link
Copy Markdown
Author

Thanks @iCharlesHu!

I can try to take a look at Linux, but I don't really have a way of testing Windows.

preferredBufferSize doesn't resolve this problem because the dispatchIO.read() call only resumes when done=true, which only happens when we've read length (i.e. preferredBufferSize) bytes or get EOF. So even with a value of 1, it will hold onto that last byte.

@jakepetroules
Copy link
Copy Markdown
Contributor

Since #259 was merged to rewrite the I/O backends and resolve the correctness issues with buffering, I think this change is likely no longer relevant - I don't think we have a realistic throughput/latency tradeoff to make.

@dannys42 given the current state of the repository do you think you still have an enhancement to propose here, or could this be closed?

@dannys42
Copy link
Copy Markdown
Author

Oh cool. Let me try out the new version and see if it resolves this issue.

@dannys42
Copy link
Copy Markdown
Author

dannys42 commented May 15, 2026

@jakepetroules Trying 0.4, I still end up having to put preferredBufferSize=1 in order to get true streaming behavior, which is an unfortunate performance hit. Without preferredBufferSize=1, the program will not output anything until either the program terminates or there's a page (16kB) worth of data in the stream.

Here's a simple test program:

import Foundation
import Subprocess

@main
struct ExampleApp {

    static func main() async throws {

        let target = CommandLine.arguments.count > 1
            ? CommandLine.arguments[1]
            : "./test_timing"

        let stdout = FileHandle.standardOutput

        let result = try await Subprocess.run(
            .path(.init(target)),
            preferredBufferSize: 1 // <--- comment out this line to see the problem
        ) { _, outputSequence in
            for try await line in outputSequence.lines() {
                stdout.write(Data(line.utf8))
            }
        }

        print("exit:", result.terminationStatus)
    }
}

Also compile this as test_timing:

#include <stdio.h>
#include <unistd.h>
#include <time.h>

void print_time(const char* label) {
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    fprintf(stderr, "[%s] %ld.%09ld\n", label, ts.tv_sec, ts.tv_nsec);
    fflush(stderr);
}

int main(void) {
    print_time("start");
    
    fprintf(stdout, "line 1\n");
    fflush(stdout);
    print_time("after line 1");
    
    sleep(1);
    
    fprintf(stdout, "line 2\n");
    fflush(stdout);
    print_time("after line 2");
    
    sleep(2);
    
    fprintf(stdout, "line 3\n");
    fflush(stdout);
    print_time("after line 3");
    
    return 0;
}

Example.zip

@dannys42
Copy link
Copy Markdown
Author

dannys42 commented May 15, 2026

Oh wait... I just realized the fix you mentioned is in main and not yet released.

@dannys42
Copy link
Copy Markdown
Author

Yes, the fix seems to resolve the issue for me. Thank you!

@dannys42 dannys42 closed this May 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants