_wait_generators_bulk perf#63808
Conversation
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces bulk consumption and peeking APIs for streaming generators in Ray, including _wait_and_fetch and _wait_generators_bulk in Python, along with corresponding C++ CoreWorker implementations (WaitAndFetch, TryReadObjectRefStreamN, and PeekObjectRefStreamN). These changes are integrated into Ray Data's streaming executor to optimize task completion waiting. The review feedback highlights several critical issues: potential runtime NameErrors due to a missing time import and an un-namespaced exception reference, a bug in TryReadObjectRefStreamN where backpressure_threshold defaults to 0 instead of -1 (which could cause generator hangs), a potential crash from a strict RAY_CHECK_EQ on mismatched task IDs, and the risky use of assert False for control flow validation.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| core_worker = self.worker.core_worker | ||
| try: | ||
| core_worker.try_read_next_object_ref_stream_n(self._generator_ref, num_refs) | ||
| except ObjectRefStreamEndOfStreamError: |
There was a problem hiding this comment.
The exception ObjectRefStreamEndOfStreamError is referenced here but it may not be imported in this file's namespace, which would cause a NameError at runtime. It is safer to reference it via ray.exceptions.ObjectRefStreamEndOfStreamError or explicitly import it.
| except ObjectRefStreamEndOfStreamError: | |
| except ray.exceptions.ObjectRefStreamEndOfStreamError: |
| from typing import TYPE_CHECKING, Dict, List, Optional, Tuple | ||
|
|
||
| import ray | ||
| from ray._private.worker import _wait_generators_bulk |
There was a problem hiding this comment.
The time module is used in this file (e.g., time.monotonic()) but is not imported at the top of the file. This will result in a NameError at runtime. Please import time.
| from ray._private.worker import _wait_generators_bulk | |
| import time | |
| from ray._private.worker import _wait_generators_bulk |
| Status TaskManager::TryReadObjectRefStreamN(const ObjectID &generator_id, | ||
| int64_t num_items) { | ||
| RAY_CHECK_GT(num_items, 0); | ||
| auto backpressure_threshold = 0; |
There was a problem hiding this comment.
In TryReadObjectRefStreamN, backpressure_threshold is initialized to 0. However, in the singular TryReadObjectRefStream, it is initialized to -1. If the task is not found in submissible_tasks_, defaulting to 0 instead of -1 incorrectly enables backpressure with a threshold of 0, which can cause the generator to hang. It should be initialized to -1 to match the singular version's behavior.
| auto backpressure_threshold = 0; | |
| auto backpressure_threshold = -1; |
|
|
||
| bool ObjectRefStream::IsObjectRefAfterEndOfStream(const ObjectID &object_id) const { | ||
| RAY_CHECK_NE(end_of_stream_index_, -1); | ||
| RAY_CHECK_EQ(object_id.TaskId(), generator_task_id_); |
There was a problem hiding this comment.
Using RAY_CHECK_EQ(object_id.TaskId(), generator_task_id_) here can cause a crash if temporarily_owned_refs_ contains any object IDs from other tasks. Since TemporarilyInsertToStreamIfNeeded defensively allows object IDs with different task IDs to be inserted, we should handle this gracefully by returning false instead of crashing the process.
| RAY_CHECK_EQ(object_id.TaskId(), generator_task_id_); | |
| if (object_id.TaskId() != generator_task_id_) { | |
| return false; | |
| } |
| except ray.exceptions.ObjectRefStreamEndOfStreamError: | ||
| try: | ||
| ray.get(self._pending_block_ref) | ||
| assert False, "Above ray.get should raise an exception." |
There was a problem hiding this comment.
Using assert False for control flow validation is risky because assertions can be optimized away when Python is run with the -O flag. If optimized away, this block will silently succeed instead of raising an exception. It is better to explicitly raise a RuntimeError.
| assert False, "Above ray.get should raise an exception." | |
| raise RuntimeError("Above ray.get should raise an exception.") |
adf9211 to
c31abae
Compare
DO NOT MERGE