Skip to content

_wait_generators_bulk perf#63808

Draft
rueian wants to merge 20 commits into
ray-project:masterfrom
rueian:worker-wait-fetch-perf
Draft

_wait_generators_bulk perf#63808
rueian wants to merge 20 commits into
ray-project:masterfrom
rueian:worker-wait-fetch-perf

Conversation

@rueian

@rueian rueian commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

DO NOT MERGE

rueian added 15 commits April 17, 2026 23:42
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces bulk consumption and peeking APIs for streaming generators in Ray, including _wait_and_fetch and _wait_generators_bulk in Python, along with corresponding C++ CoreWorker implementations (WaitAndFetch, TryReadObjectRefStreamN, and PeekObjectRefStreamN). These changes are integrated into Ray Data's streaming executor to optimize task completion waiting. The review feedback highlights several critical issues: potential runtime NameErrors due to a missing time import and an un-namespaced exception reference, a bug in TryReadObjectRefStreamN where backpressure_threshold defaults to 0 instead of -1 (which could cause generator hangs), a potential crash from a strict RAY_CHECK_EQ on mismatched task IDs, and the risky use of assert False for control flow validation.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

core_worker = self.worker.core_worker
try:
core_worker.try_read_next_object_ref_stream_n(self._generator_ref, num_refs)
except ObjectRefStreamEndOfStreamError:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The exception ObjectRefStreamEndOfStreamError is referenced here but it may not be imported in this file's namespace, which would cause a NameError at runtime. It is safer to reference it via ray.exceptions.ObjectRefStreamEndOfStreamError or explicitly import it.

Suggested change
except ObjectRefStreamEndOfStreamError:
except ray.exceptions.ObjectRefStreamEndOfStreamError:

from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

import ray
from ray._private.worker import _wait_generators_bulk

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The time module is used in this file (e.g., time.monotonic()) but is not imported at the top of the file. This will result in a NameError at runtime. Please import time.

Suggested change
from ray._private.worker import _wait_generators_bulk
import time
from ray._private.worker import _wait_generators_bulk

Status TaskManager::TryReadObjectRefStreamN(const ObjectID &generator_id,
int64_t num_items) {
RAY_CHECK_GT(num_items, 0);
auto backpressure_threshold = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In TryReadObjectRefStreamN, backpressure_threshold is initialized to 0. However, in the singular TryReadObjectRefStream, it is initialized to -1. If the task is not found in submissible_tasks_, defaulting to 0 instead of -1 incorrectly enables backpressure with a threshold of 0, which can cause the generator to hang. It should be initialized to -1 to match the singular version's behavior.

Suggested change
auto backpressure_threshold = 0;
auto backpressure_threshold = -1;


bool ObjectRefStream::IsObjectRefAfterEndOfStream(const ObjectID &object_id) const {
RAY_CHECK_NE(end_of_stream_index_, -1);
RAY_CHECK_EQ(object_id.TaskId(), generator_task_id_);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using RAY_CHECK_EQ(object_id.TaskId(), generator_task_id_) here can cause a crash if temporarily_owned_refs_ contains any object IDs from other tasks. Since TemporarilyInsertToStreamIfNeeded defensively allows object IDs with different task IDs to be inserted, we should handle this gracefully by returning false instead of crashing the process.

Suggested change
RAY_CHECK_EQ(object_id.TaskId(), generator_task_id_);
if (object_id.TaskId() != generator_task_id_) {
return false;
}

except ray.exceptions.ObjectRefStreamEndOfStreamError:
try:
ray.get(self._pending_block_ref)
assert False, "Above ray.get should raise an exception."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using assert False for control flow validation is risky because assertions can be optimized away when Python is run with the -O flag. If optimized away, this block will silently succeed instead of raising an exception. It is better to explicitly raise a RuntimeError.

Suggested change
assert False, "Above ray.get should raise an exception."
raise RuntimeError("Above ray.get should raise an exception.")

rueian added 5 commits June 4, 2026 09:52
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
Signed-off-by: Rueian Huang <rueiancsie@gmail.com>
@rueian rueian force-pushed the worker-wait-fetch-perf branch from adf9211 to c31abae Compare June 8, 2026 21:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant