-
Notifications
You must be signed in to change notification settings - Fork 5
[StreamingDataLoader, 4/N] feat: Introduce sample pre-allocation for dynamic streaming #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…mingDataLoader Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adds support for “fully streamed” producer/consumer workflows by pre-allocating sample indices in TransferQueue, so consumers don’t terminate early when the queue is temporarily empty.
Changes:
- Introduces
TQ_PRE_ALLOC_SAMPLE_NUMand pre-allocates global indexes per partition to support streamed production/consumption. - Updates consumption/production status APIs to include pre-allocated (not-yet-produced) indexes.
- Updates the streaming tutorial and adds tests covering pre-allocation behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| tutorial/05_streaming_dataloader.py | Adds guidance to set TQ_PRE_ALLOC_SAMPLE_NUM for fully streamed usage. |
| transfer_queue/dataloader/streaming_dataset.py | Replaces a TODO with user guidance about TQ_PRE_ALLOC_SAMPLE_NUM. |
| transfer_queue/controller.py | Implements pre-allocation, status reporting changes, and env var deprecations. |
| tests/test_controller_data_partitions.py | Adds unit tests validating pre-allocated index behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Pre-allocate global indexes for consumer consumption tracking | ||
| global_indexes = self.index_manager.allocate_indexes(partition_id, count=TQ_PRE_ALLOC_SAMPLE_NUM) | ||
| self.partitions[partition_id].register_pre_allocated_indexes(global_indexes) |
Copilot
AI
Jan 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pre-allocation expands tensors to max(pre_allocated_indexes) + 1. Since PartitionIndexManager.allocate_indexes() uses a single monotonically increasing global_index_counter across partitions, creating multiple partitions will make later partitions pre-allocate higher index ranges (e.g., partition2 gets [N..2N-1]) and therefore allocate ~2N rows even though only N samples were pre-allocated. This scales memory with the global index value rather than per-partition sample count. Consider using per-partition local row indices (with a global<->local mapping) or changing the index allocation scheme so each partition’s indices start from 0 (or a compact range) to keep tensors sized to partition cardinality.
| # Pre-allocate global indexes for consumer consumption tracking | |
| global_indexes = self.index_manager.allocate_indexes(partition_id, count=TQ_PRE_ALLOC_SAMPLE_NUM) | |
| self.partitions[partition_id].register_pre_allocated_indexes(global_indexes) | |
| # Pre-allocate compact, partition-local indexes for consumer consumption tracking. | |
| # Using local indexes (0..TQ_PRE_ALLOC_SAMPLE_NUM-1) avoids tensor sizes scaling with | |
| # any global index counter shared across partitions. | |
| pre_alloc_indexes = list(range(TQ_PRE_ALLOC_SAMPLE_NUM)) | |
| self.partitions[partition_id].register_pre_allocated_indexes(pre_alloc_indexes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for simplifying the addressing using global_index
|
Great work! 👏 |
Signed-off-by: 0oshowero0 <o0shower0o@outlook.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
transfer_queue/controller.py:335
ensure_samples_capacity()now expands the tensors by exactlyexpansion_neededevery time. For workloads that append samples incrementally (common in streaming), this causes repeated realloc+copy and can devolve into O(n^2) behavior. Consider restoring a minimum growth factor (e.g., geometric growth / doubling or a configurable minimum expansion) to reduce reallocations.
current_sample_space = self.allocated_samples_num
if required_samples > current_sample_space:
# Expand rows
expansion_needed = required_samples - current_sample_space
new_samples = current_sample_space + expansion_needed
new_fields = self.production_status.shape[1]
expanded_tensor = torch.zeros(new_samples, new_fields, dtype=torch.int8)
expanded_tensor[:current_sample_space, :] = self.production_status
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| "\nIn real-world usage, please export the environment variable of TQ_PRE_ALLOC_SAMPLE_NUM to " | ||
| "global_batch_size to make sure consumers can accurately determine consumption status even before " | ||
| "producers have generated the samples." |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This user-facing message is a bit unclear/grammatically awkward (“export the environment variable of … to …”). Consider rephrasing to an explicit command (e.g., export TQ_PRE_ALLOC_SAMPLE_NUM=<global_batch_size>) or otherwise clarify that the env var should be set equal to global_batch_size.
| "\nIn real-world usage, please export the environment variable of TQ_PRE_ALLOC_SAMPLE_NUM to " | |
| "global_batch_size to make sure consumers can accurately determine consumption status even before " | |
| "producers have generated the samples." | |
| "\nIn real-world usage, set the environment variable TQ_PRE_ALLOC_SAMPLE_NUM equal to your " | |
| "global batch size so consumers can accurately determine consumption status even before " | |
| "producers have generated the samples, for example:\n" | |
| " export TQ_PRE_ALLOC_SAMPLE_NUM=<global_batch_size>" |
| # Sample pre-allocation for StreamingDataLoader compatibility. | ||
| # By pre-allocating sample indices (typically global_batch_size), consumers can accurately | ||
| # determine consumption status even before producers have generated the samples. | ||
| TQ_PRE_ALLOC_SAMPLE_NUM = int(os.environ.get("TQ_PRE_ALLOC_SAMPLE_NUM", 1)) |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TQ_PRE_ALLOC_SAMPLE_NUM is defaulted to 1 and create_partition() always calls allocate_indexes(..., count=TQ_PRE_ALLOC_SAMPLE_NUM). This makes pre-allocation effectively mandatory and also prevents setting the value to 0 (since allocate_indexes raises for count<=0), which is inconsistent with the PR description implying this is an opt-in behavior. Consider defaulting to 0 and skipping pre-allocation when the value is 0, or otherwise document/justify why every partition must always reserve at least one index.
| assert partition is not None | ||
| batch_global_indexes = partition.activate_pre_allocated_indexes(batch_size) | ||
|
|
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using assert for runtime validation in production code (assert can be optimized away with python -O). If partition being None is not expected, raise a clear exception instead so the failure mode is deterministic.
Background
In PR #9, we introduced initial support for the
StreamingDataLoaderinterface. Currently, the system assumes prompts are pre-loaded into the TransferQueue. However, a critical use case involves generation workers put both prompts and responses intoTransferQueueon the run (e.g.,rollout_buffermechanism in Slime).Since TransferQueue supports dynamic expansion, if the producer has not yet pushed any data to the TransferQueue, the TransferQueue appears empty. Consequently, the consumer's
check_consumption_statusAPI incorrectly assumes no data is available and prematurely terminates the data retrieval iteration.Solution
This PR introduces a new environment variable,
TQ_PRE_ALLOC_SAMPLE_NUM, to handle sample pre-allocation in TransferQueue.global_batch_size), the controller pre-allocates a fixed number of global indexes before data production begins.check_consumption_statusAPI now accounts for these pre-allocated slots. This ensures theStreamingDataLoaderwaits for the pending data instead of exiting immediately when the TransferQueue is temporarily empty.Other Changes
Deprecate
TQ_INIT_SAMPLE_NUM,TQ_INIT_FIELD_NUM,TQ_SAMPLE_MIN_EXPANSION_SIZEandTQ_SAMPLE_MIN_EXPANSION_SIZEfor simplicity.CC: @NINGBENZHE