Skip to content

[core] (FreeObjects 1/n) Adding FreeLocalObjects RPC for owner-driven free mechanism#63218

Merged
edoakes merged 37 commits into
ray-project:masterfrom
aaronscalene:aaron/passive-owner-callback
Jun 9, 2026
Merged

[core] (FreeObjects 1/n) Adding FreeLocalObjects RPC for owner-driven free mechanism#63218
edoakes merged 37 commits into
ray-project:masterfrom
aaronscalene:aaron/passive-owner-callback

Conversation

@aaronscalene

@aaronscalene aaronscalene commented May 8, 2026

Copy link
Copy Markdown
Contributor

Motivation

Within this multi-step PR process, our goal is to change our current FreeObjects implementation to an owner driven process. Current design of FreeObjects involves spraying the entire cluster with FreeObjects RPCs. This introduces many caveats: bloating observability with FreeObjects RPC counts, sending excessive RPCs, and worse mental model from owner-driven reference counting. This is a continuation from #63117. Due to the race condition detailed in the testing section of the PR, we decided to merge both changes for a complete PR.

Our current design for FreeObjects involves the following steps:

  1. The reference goes out of scope. The reference here represents both the owner's python references, as well as other borrower references throughout the cluster, which is tracked through the owner reference.
  2. The owner finds out, and sends a pubsub message (initialized when pinning the primary copy) to the primary copy raylet.
  3. The primary copy raylet frees its primary copy, and then broadcasts free secondary copies to all nodes within the cluster. (This is batched and sent periodically). The reason the primary copy raylet needs to broadcast secondary copies is because it doesn't store object locations for its primary copy, this is only stored on the owner. Thus, in order to ensure all secondary copies are freed, there is a need to broadcast to all nodes.

Design

As noted above, the owner stores the locations of all copies of the objects, where pinned_at_node_id_ represents the primary copy location and the locations vector demonstrates all secondary object locations.

We now move towards a new design FreeObjects. We will get rid of the pubsub between owner -> primary copy raylet. Instead:

  1. The reference goes out of scope. The reference here represents both the owner's python references, as well as other borrower references throughout the cluster, which is tracked through the owner reference.
  2. The owner sends RPCs to primary and secondary copy holders to free their objects. (Since it has all of its locations)

However, this doesn't easily solve our issue since the pubsub also has an owner_dead_callback, which fires when the owner has died. This will help clean up the primary object and ensure there are no memory leaks in this case. To continue supporting this functionality, if the owner dies, this information is propagated by the GCS, and thus each raylet just listens to the GCS events and updates their states accordingly.

Originally, the design was that:

  • the pubsub connection fails / ends for some unexpected reason
  • the owner is thought to be dead, so the primary copy raylet then frees its primary copy and queues up FreeObjects to broadcast to all secondary copies

The new design:

  • The GCS already publishes WorkerFailure and NodeRemoved announcements to all raylets. This is so that raylets know to release worker leases of dead nodes / workers
  • With this new design, the raylets will listen and additionally clear any objects that had their owner on the failed worker / node
  • Something to consider is how exactly GCS gets these notifications, and if there would be any slowness in these notifications.

1.) When the worker process dies (but the node remains alive), the raylet picks up the worker death relatively quickly via the broken IPC connection and sends the worker death up to the GCS, and GCS broadcasts it.

2.) On node death, similar story.

However, in old implementation, the long poll connection would be sitting the retryable grpc client queue waiting for either the connection to come back up or the GCS to get a worker death notification. Hence all in all, there really shouldn't be a significant change in behavior.

Description

This PR covers the first and second part of the FreeObjects change. The first part is strictly adding functionality for the owner to be in charge of freeing objects. We make the first step away from this design such that the owner calls a new FreeLocalObjects RPC to all primary and secondary copy locations when OnObjectsOutOfScopeOrFreed. The second part amends the functionality of the current owner dead pubsub callback to be moved from an eager pubsub death callback to a more passive listener callback.

Considerations

NodeManager RPC vs. ObjectManager RPC

For this PR, I considered either 1) Adding a new RPC to NodeManager or 2) Adding new route that calls into existing ObjectManager FreeObjects. Ultimately, I decided to go with 1). A design consideration here is that workers currently only call into raylets' NodeManager, so part of this is to keep with existing implementation. Another reason is that our new HandleFreeLocalObjects would be responsible for 2 things: 1) if primary copy, delete metadata from LocalObjectManager (manages spilled and primary copies) and 2) if any copy, queue up within FreeObjects. Thus, NodeManager currently has functionality that calls into LocalObjectManager. Furthermore, I find that directly interfacing with ObjectManager is not as intuitive, and creating a new path just for ObjectManager does not seem ideal.

Moving Around ReferenceCounter Variables

For this PR, I decided to follow similar formats in the codebase, and have all the lambdas within the constructor shifted to the end. In this case, they can be considered optional arguments and can be defaulted to null if not specified. This leads to easier testing implementations. A tricky part is to ensure these functions generate no-op if they are not specified.

Only Owner Frees

An important consideration is that now that we are sending from the worker side, only if the object is freed is owned by the current worker should the worker send RPC requests to the object locations. Otherwise, the worker should just keep quiet.

Dead Code Deleted in Future

For this PR, I kept FreeObjects's local_only=false case still alive, even though this case is essentially never called again. In the future, I will delete this dead code.

Testing Changes and ReferenceCounter Changes

For this PR, there were many testing changes that are needed because of this new path that is different from the first one. For now, many of these tests are just hardcoded to match correct behavior, but in the future some unnecessary tests might be deleted or shuffled around to match the new implementation.

Pass Over All Objects

This new design introduces a pass over all objects within the current node every announcement. However, failed nodes and worker announcements should not be too common and if the object is not owned by the failed worker / node it would be a no-op; thus this should not lead to any latency issues.

Owner is Less Eager

This new design will be a less eager free, especially for the primary copy of the node. This is definitely an important part to consider, however since it is similar a pubsub for node information, the latency should not be that big of a deal. Furthermore, the pubsub interface already keeps track of network failures etc. to some extent, and if there are further failures / GCS failures there are probably worse problems to deal with.

LocalObjectManager local_objects_ vs. ObjectManager local_objects_

An important distinction is needed between both classes' local objects, where we need to aggregate and free any objects within both classes. LocalObjectManager in this case keeps track of primary copies as well as spilled copies. ObjectManager keeps track of anything in plasma such as primary copies / secondary copies. Thus, we need to get the union of both objects to get the objects that we want to free.

Testing

Ran python tests and C++ tests adjacent to these changes to ensure no regressions. Future e2e testing in #63181

We will fix this race condition in the immediate next PR, which uses the GCS to broadcast deaths and have the raylets free accordingly.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new RPC mechanism, FreeLocalObjects, to allow the CoreWorker to explicitly instruct NodeManager instances to free local objects. This replaces the previous eviction-pubsub callback approach. My review identified several areas for improvement: the CoreWorker lambda capture should use shared_from_this for lifetime safety, the RPC implementation should be batched to avoid performance degradation in large clusters, and the now-unused objects_pending_deletion_ logic in LocalObjectManager should be cleaned up.

Comment thread src/ray/core_worker/core_worker.cc Outdated
Comment thread src/ray/core_worker/core_worker.cc Outdated
Comment thread src/ray/raylet/local_object_manager.cc
@aaronscalene aaronscalene force-pushed the aaron/passive-owner-callback branch from 483b867 to 9482fe8 Compare May 8, 2026 23:21
@aaronscalene aaronscalene changed the title [core] [DO NOT MERGE] (FreeObjects 2/n) [core] (FreeObjects 2/n) Added GCS passive owner death removal May 15, 2026
@aaronscalene aaronscalene changed the title [core] (FreeObjects 2/n) Added GCS passive owner death removal [core] (FreeObjects 2/n) Added GCS passive owner death object freeing May 15, 2026
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
@aaronscalene aaronscalene force-pushed the aaron/passive-owner-callback branch from 3c5129a to 01dd4d9 Compare May 15, 2026 21:20
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
@aaronscalene aaronscalene force-pushed the aaron/passive-owner-callback branch from 01dd4d9 to a8c096f Compare May 20, 2026 22:22
@aaronscalene aaronscalene marked this pull request as ready for review May 21, 2026 03:14
@aaronscalene aaronscalene requested a review from a team as a code owner May 21, 2026 03:14
Comment thread src/ray/protobuf/node_manager.proto
Comment thread src/ray/raylet/local_object_manager.cc Outdated
@ray-gardener ray-gardener Bot added the core Issues that should be addressed in Ray Core label May 21, 2026
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
v
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
i
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
@aaronscalene aaronscalene force-pushed the aaron/passive-owner-callback branch from a8c096f to cc91473 Compare May 22, 2026 21:24
Comment thread src/ray/core_worker/reference_counter.cc
@aaronscalene aaronscalene changed the title [core] (FreeObjects 2/n) Added GCS passive owner death object freeing [core] (FreeObjects 1/n) Added GCS passive owner death object freeing May 29, 2026
@aaronscalene aaronscalene changed the title [core] (FreeObjects 1/n) Added GCS passive owner death object freeing [core] (FreeObjects 1/n) Adding FreeLocalObjects RPC and gutting pubsub May 29, 2026
@aaronscalene aaronscalene changed the title [core] (FreeObjects 1/n) Adding FreeLocalObjects RPC and gutting pubsub [core] (FreeObjects 1/n) Adding FreeLocalObjects RPC for owner free mechanism May 29, 2026
@aaronscalene aaronscalene changed the title [core] (FreeObjects 1/n) Adding FreeLocalObjects RPC for owner free mechanism [core] (FreeObjects 1/n) Adding FreeLocalObjects RPC for owner-driven free mechanism May 30, 2026
Comment thread src/ray/core_worker/core_worker.h Outdated
/// Send a FreeLocalObjects RPC to every raylet holding a copy of the object.
///
/// \param object_id The object whose copies should be freed.
/// \param locations Nodes that hold a copy of the object. Must include

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the description for locations is a bit verbose. I'd prob just say something like "a list of all nodes that hold a copy of the object, including primary and secondary copies"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

[&worker_id](const ObjectInfo &info) { return info.owner_worker_id == worker_id; });
}

std::vector<ObjectID> ObjectManager::GetLocalObjectsOwnedBy(const NodeID &node_id) const {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though it's a bit more verbose, since nodes themselves don't "own" objects and just worker processes on those nodes own objects, I'd probably clarify this to something like "GetLocalObjectsOwnedByOwnersOnNode"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good clarification. I settled with GetLocalObjectsOwnedByOwnersOn(node_id) to be the same as GetLocalObjectsOwnedBy(worker_id).

const auto obj_id = ObjectID::FromBinary(object_id_binary);
ReleaseFreedObject(obj_id);
};
auto owner_dead_callback = [owner_address](const std::string &object_id_binary,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a TODO and link the follow up PR here in that you'll clean this callback up completely + update the corresponding gtests to remove the usage of the pubsub?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Comment thread src/ray/raylet/local_object_manager.cc Outdated
// Only free the object if it is not already freed.
void LocalObjectManager::ReleaseFreedLocalObject(const ObjectID &object_id) {
// Primary-copy bookkeeping only runs when we actually hold the primary copy
// and it hasn't been freed yet. Secondary copies fall through to the enqueue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what you mean by secondary copies fall through to the enqueue below? I'm still a bit confused on why you needed to remove the early return here. How were secondary copies cleaned up prior to this change, I'm assuming since the local object manager tracks primary copies only the early return caught them, so there was something else that cleaned them up?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally, this path was only called by primary copies, which then triggers the FreeObjects broadcast. Now, since each local object goes through this path, for primary copies we will remove any bookkeeping and for secondary copies we just want to enqueue it to be freed.

Signed-off-by: aaron.li <aaron.li@anyscale.com>

@aaronscalene aaronscalene left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reviewed

const auto obj_id = ObjectID::FromBinary(object_id_binary);
ReleaseFreedObject(obj_id);
};
auto owner_dead_callback = [owner_address](const std::string &object_id_binary,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

return GetCoreWorker()->gcs_client_->Nodes().IsNodeDead(node_id);
},
/*spread_free_local_objects=*/
[this](const ObjectID &object_id, const std::vector<NodeID> &locations) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems like the only place that that this is called has the locations in an unordered set. Do we need to do the conversion to a vector here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, done!

send_reply_callback(Status::OK(), nullptr, nullptr);
}

// TODO(aaronscalene) will delete local_only=false and related dead code in #63213

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: format should be TODO(#63213)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

std::vector<ObjectID> LocalObjectManager::GetLocalObjectsOwnedBy(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: this is duplicated in both object_manager and local_object_manager. What you could do is add these functions into a shared util file and pass the local_objects_ as a parameter.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^^

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this might be hard because the LocalObjectInfo in object_manager is different from the LocalObjectInfo in local_object_manager, so there still needs to be four separate definitions, unless I add methods to LocalObjectInfos to compromise them together, which I don't really like. I decided to move LocalObjectsMatchedBy into a util function shared file, which I think is beneficial.

@Sparks0219 Sparks0219 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice job! @edoakes could you PTAL and merge?

@Sparks0219 Sparks0219 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be helpful to point out the failure model a bit more explicitly in the PR description.

1.) When the worker process dies (but the node remains alive), the raylet picks up the worker death relatively quickly via the broken IPC connection and sends the worker death up to the GCS, and GCS broadcasts it. Meanwhile, the long poll connection would be sitting the retryable grpc client queue waiting for either the connection to come back up or the GCS to get a worker death notification.

2.) On node death, similar story.

Hence all in all, there really shouldn't be a significant change in behavior.

Comment thread src/ray/raylet/local_object_manager.cc Outdated
void LocalObjectManager::ReleaseFreedLocalObject(const ObjectID &object_id) {
// This is called for both primary and secondary copies. For secondary copies, they
// should just be queued up to be freed below. For primary copies, additional
// bookkeeping is needed to ensure there is no regression.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "no regression" mean?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, bad comment. Changed it to a more suitable comment.

}
}

std::vector<ObjectID> LocalObjectManager::GetLocalObjectsOwnedBy(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^^

Comment thread src/ray/raylet/main.cc
[&](const std::vector<ray::ObjectID> &object_ids) {
object_manager->FreeObjects(object_ids,
/*local_only=*/false);
/*local_only=*/true);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_only flag will be removed in a future PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes #63213

Comment thread src/ray/raylet/node_manager.cc Outdated
for (const auto &id : object_manager_.GetLocalObjectsOwnedByOwnersOn(node_id)) {
ids.insert(id);
}
RAY_UNUSED(FreeLocalObjects(std::vector<ObjectID>(ids.begin(), ids.end())));

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why RAY_UNUSED? either handle the error if it's recoverable (at minimum a warning) or make it a RAY_CHECK

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Artifact of past implementation, no need for status anymore so changed to void, good catch.

publisher_.get(),
subscriber_.get(),
/*is_node_dead=*/[](const NodeID &) { return false; },
/*spread_free_local_objects=*/

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "spread" is a bit misleading here as it indicates that we are sending the RPC across the cluster

I would call this something like free_objects_on_nodes_async

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment thread src/ray/core_worker/core_worker.cc Outdated
Comment on lines +4568 to +4569
void CoreWorker::SpreadFreeLocalObjects(const ObjectID &object_id,
const std::vector<NodeID> &locations) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment on naming as above, something like FreeObjectOnNodesAsync

The "local" naming here is also a bit confusing -- it makes sense in the RPC to a specific raylet, but here we are actually freeing the object across non-local nodes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, done

Comment thread src/ray/core_worker/core_worker.cc Outdated
Comment on lines +4582 to +4587
if (!status.ok()) {
RAY_LOG(WARNING).WithField(object_id)
<< "FreeLocalObjects RPC to node " << node_id
<< " failed: " << status.ToString();
}
});

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the RPC retry automatically? In what case would we ever get !status.ok() and how should we handle it?

@aaronscalene aaronscalene Jun 4, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RPC does retry automatically as it is through RetryableGrpcClient. Thus, for any network related errors, this should automatically retry. I believe this status case will only happen when the Raylet dies. I looked at other implementations and they seem to just log a warning within the raylet client, which is what I ended up doing.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm it would be good to have a specific status for that (node died error) and possibly use the statusset for this @Sparks0219. That way clients are forced to explicitly handle a subset of possible errors as applicable.

Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Comment thread src/ray/raylet/node_manager.cc
@aaronscalene aaronscalene force-pushed the aaron/passive-owner-callback branch from 8f4c9cd to fdf18e3 Compare June 8, 2026 19:13
Signed-off-by: aaronscalene <aaron.li@anyscale.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 9c25256. Configure here.

Comment thread src/ray/core_worker/reference_counter.cc
@edoakes edoakes enabled auto-merge (squash) June 8, 2026 21:39
Signed-off-by: aaron.li <aaron.li@anyscale.com>
@github-actions github-actions Bot disabled auto-merge June 8, 2026 22:45
@edoakes edoakes merged commit ce4ccec into ray-project:master Jun 9, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants