Skip to content

Conversation

@0oshowero0
Copy link
Collaborator

Background

In PR #9, we introduced initial support for the StreamingDataLoader interface. Currently, the system assumes prompts are pre-loaded into the TransferQueue. However, a critical use case involves generation workers put both prompts and responses into TransferQueue on the run (e.g., rollout_buffer mechanism in Slime).

Since TransferQueue supports dynamic expansion, if the producer has not yet pushed any data to the TransferQueue, the TransferQueue appears empty. Consequently, the consumer's check_consumption_status API incorrectly assumes no data is available and prematurely terminates the data retrieval iteration.

Solution

This PR introduces a new environment variable, TQ_PRE_ALLOC_SAMPLE_NUM, to handle sample pre-allocation in TransferQueue.

  • Mechanism: When set (typically to global_batch_size), the controller pre-allocates a fixed number of global indexes before data production begins.
  • Effect: The check_consumption_status API now accounts for these pre-allocated slots. This ensures the StreamingDataLoader waits for the pending data instead of exiting immediately when the TransferQueue is temporarily empty.

Other Changes

Deprecate TQ_INIT_SAMPLE_NUM, TQ_INIT_FIELD_NUM, TQ_SAMPLE_MIN_EXPANSION_SIZE and TQ_SAMPLE_MIN_EXPANSION_SIZE for simplicity.


CC: @NINGBENZHE

…mingDataLoader

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copilot AI review requested due to automatic review settings January 27, 2026 10:13
@0oshowero0 0oshowero0 changed the title [StreamingDataLoader, 4/N] feat: Support fully streamed scenario by pre-allocate samples [StreamingDataLoader, 4/N] feat: Introduce sample pre-allocation for dynamic streaming Jan 27, 2026
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds support for “fully streamed” producer/consumer workflows by pre-allocating sample indices in TransferQueue, so consumers don’t terminate early when the queue is temporarily empty.

Changes:

  • Introduces TQ_PRE_ALLOC_SAMPLE_NUM and pre-allocates global indexes per partition to support streamed production/consumption.
  • Updates consumption/production status APIs to include pre-allocated (not-yet-produced) indexes.
  • Updates the streaming tutorial and adds tests covering pre-allocation behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
tutorial/05_streaming_dataloader.py Adds guidance to set TQ_PRE_ALLOC_SAMPLE_NUM for fully streamed usage.
transfer_queue/dataloader/streaming_dataset.py Replaces a TODO with user guidance about TQ_PRE_ALLOC_SAMPLE_NUM.
transfer_queue/controller.py Implements pre-allocation, status reporting changes, and env var deprecations.
tests/test_controller_data_partitions.py Adds unit tests validating pre-allocated index behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +846 to +848
# Pre-allocate global indexes for consumer consumption tracking
global_indexes = self.index_manager.allocate_indexes(partition_id, count=TQ_PRE_ALLOC_SAMPLE_NUM)
self.partitions[partition_id].register_pre_allocated_indexes(global_indexes)
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-allocation expands tensors to max(pre_allocated_indexes) + 1. Since PartitionIndexManager.allocate_indexes() uses a single monotonically increasing global_index_counter across partitions, creating multiple partitions will make later partitions pre-allocate higher index ranges (e.g., partition2 gets [N..2N-1]) and therefore allocate ~2N rows even though only N samples were pre-allocated. This scales memory with the global index value rather than per-partition sample count. Consider using per-partition local row indices (with a global<->local mapping) or changing the index allocation scheme so each partition’s indices start from 0 (or a compact range) to keep tensors sized to partition cardinality.

Suggested change
# Pre-allocate global indexes for consumer consumption tracking
global_indexes = self.index_manager.allocate_indexes(partition_id, count=TQ_PRE_ALLOC_SAMPLE_NUM)
self.partitions[partition_id].register_pre_allocated_indexes(global_indexes)
# Pre-allocate compact, partition-local indexes for consumer consumption tracking.
# Using local indexes (0..TQ_PRE_ALLOC_SAMPLE_NUM-1) avoids tensor sizes scaling with
# any global index counter shared across partitions.
pre_alloc_indexes = list(range(TQ_PRE_ALLOC_SAMPLE_NUM))
self.partitions[partition_id].register_pre_allocated_indexes(pre_alloc_indexes)

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for simplifying the addressing using global_index

@NINGBENZHE
Copy link
Contributor

Great work! 👏

Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

transfer_queue/controller.py:335

  • ensure_samples_capacity() now expands the tensors by exactly expansion_needed every time. For workloads that append samples incrementally (common in streaming), this causes repeated realloc+copy and can devolve into O(n^2) behavior. Consider restoring a minimum growth factor (e.g., geometric growth / doubling or a configurable minimum expansion) to reduce reallocations.
        current_sample_space = self.allocated_samples_num
        if required_samples > current_sample_space:
            # Expand rows
            expansion_needed = required_samples - current_sample_space
            new_samples = current_sample_space + expansion_needed
            new_fields = self.production_status.shape[1]

            expanded_tensor = torch.zeros(new_samples, new_fields, dtype=torch.int8)
            expanded_tensor[:current_sample_space, :] = self.production_status

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +345 to +347
"\nIn real-world usage, please export the environment variable of TQ_PRE_ALLOC_SAMPLE_NUM to "
"global_batch_size to make sure consumers can accurately determine consumption status even before "
"producers have generated the samples."
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This user-facing message is a bit unclear/grammatically awkward (“export the environment variable of … to …”). Consider rephrasing to an explicit command (e.g., export TQ_PRE_ALLOC_SAMPLE_NUM=<global_batch_size>) or otherwise clarify that the env var should be set equal to global_batch_size.

Suggested change
"\nIn real-world usage, please export the environment variable of TQ_PRE_ALLOC_SAMPLE_NUM to "
"global_batch_size to make sure consumers can accurately determine consumption status even before "
"producers have generated the samples."
"\nIn real-world usage, set the environment variable TQ_PRE_ALLOC_SAMPLE_NUM equal to your "
"global batch size so consumers can accurately determine consumption status even before "
"producers have generated the samples, for example:\n"
" export TQ_PRE_ALLOC_SAMPLE_NUM=<global_batch_size>"

Copilot uses AI. Check for mistakes.
Comment on lines +65 to +68
# Sample pre-allocation for StreamingDataLoader compatibility.
# By pre-allocating sample indices (typically global_batch_size), consumers can accurately
# determine consumption status even before producers have generated the samples.
TQ_PRE_ALLOC_SAMPLE_NUM = int(os.environ.get("TQ_PRE_ALLOC_SAMPLE_NUM", 1))
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TQ_PRE_ALLOC_SAMPLE_NUM is defaulted to 1 and create_partition() always calls allocate_indexes(..., count=TQ_PRE_ALLOC_SAMPLE_NUM). This makes pre-allocation effectively mandatory and also prevents setting the value to 0 (since allocate_indexes raises for count<=0), which is inconsistent with the PR description implying this is an opt-in behavior. Consider defaulting to 0 and skipping pre-allocation when the value is 0, or otherwise document/justify why every partition must always reserve at least one index.

Copilot uses AI. Check for mistakes.
Comment on lines +1034 to +1036
assert partition is not None
batch_global_indexes = partition.activate_pre_allocated_indexes(batch_size)

Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using assert for runtime validation in production code (assert can be optimized away with python -O). If partition being None is not expected, raise a clear exception instead so the failure mode is deterministic.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants