[core] (FreeObjects 1/n) Adding FreeLocalObjects RPC for owner-driven free mechanism#63218
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new RPC mechanism, FreeLocalObjects, to allow the CoreWorker to explicitly instruct NodeManager instances to free local objects. This replaces the previous eviction-pubsub callback approach. My review identified several areas for improvement: the CoreWorker lambda capture should use shared_from_this for lifetime safety, the RPC implementation should be batched to avoid performance degradation in large clusters, and the now-unused objects_pending_deletion_ logic in LocalObjectManager should be cleaned up.
483b867 to
9482fe8
Compare
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
3c5129a to
01dd4d9
Compare
Signed-off-by: aaron.li <aaron.li@anyscale.com>
01dd4d9 to
a8c096f
Compare
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
Signed-off-by: aaron.li <aaron.li@anyscale.com>
a8c096f to
cc91473
Compare
…allback Signed-off-by: aaron.li <aaron.li@anyscale.com>
| /// Send a FreeLocalObjects RPC to every raylet holding a copy of the object. | ||
| /// | ||
| /// \param object_id The object whose copies should be freed. | ||
| /// \param locations Nodes that hold a copy of the object. Must include |
There was a problem hiding this comment.
nit: the description for locations is a bit verbose. I'd prob just say something like "a list of all nodes that hold a copy of the object, including primary and secondary copies"
| [&worker_id](const ObjectInfo &info) { return info.owner_worker_id == worker_id; }); | ||
| } | ||
|
|
||
| std::vector<ObjectID> ObjectManager::GetLocalObjectsOwnedBy(const NodeID &node_id) const { |
There was a problem hiding this comment.
Though it's a bit more verbose, since nodes themselves don't "own" objects and just worker processes on those nodes own objects, I'd probably clarify this to something like "GetLocalObjectsOwnedByOwnersOnNode"
There was a problem hiding this comment.
Good clarification. I settled with GetLocalObjectsOwnedByOwnersOn(node_id) to be the same as GetLocalObjectsOwnedBy(worker_id).
| const auto obj_id = ObjectID::FromBinary(object_id_binary); | ||
| ReleaseFreedObject(obj_id); | ||
| }; | ||
| auto owner_dead_callback = [owner_address](const std::string &object_id_binary, |
There was a problem hiding this comment.
Can you leave a TODO and link the follow up PR here in that you'll clean this callback up completely + update the corresponding gtests to remove the usage of the pubsub?
| // Only free the object if it is not already freed. | ||
| void LocalObjectManager::ReleaseFreedLocalObject(const ObjectID &object_id) { | ||
| // Primary-copy bookkeeping only runs when we actually hold the primary copy | ||
| // and it hasn't been freed yet. Secondary copies fall through to the enqueue |
There was a problem hiding this comment.
Can you explain what you mean by secondary copies fall through to the enqueue below? I'm still a bit confused on why you needed to remove the early return here. How were secondary copies cleaned up prior to this change, I'm assuming since the local object manager tracks primary copies only the early return caught them, so there was something else that cleaned them up?
There was a problem hiding this comment.
Originally, this path was only called by primary copies, which then triggers the FreeObjects broadcast. Now, since each local object goes through this path, for primary copies we will remove any bookkeeping and for secondary copies we just want to enqueue it to be freed.
| const auto obj_id = ObjectID::FromBinary(object_id_binary); | ||
| ReleaseFreedObject(obj_id); | ||
| }; | ||
| auto owner_dead_callback = [owner_address](const std::string &object_id_binary, |
| return GetCoreWorker()->gcs_client_->Nodes().IsNodeDead(node_id); | ||
| }, | ||
| /*spread_free_local_objects=*/ | ||
| [this](const ObjectID &object_id, const std::vector<NodeID> &locations) { |
There was a problem hiding this comment.
nit: seems like the only place that that this is called has the locations in an unordered set. Do we need to do the conversion to a vector here?
There was a problem hiding this comment.
Good idea, done!
| send_reply_callback(Status::OK(), nullptr, nullptr); | ||
| } | ||
|
|
||
| // TODO(aaronscalene) will delete local_only=false and related dead code in #63213 |
| } | ||
| } | ||
|
|
||
| std::vector<ObjectID> LocalObjectManager::GetLocalObjectsOwnedBy( |
There was a problem hiding this comment.
super nit: this is duplicated in both object_manager and local_object_manager. What you could do is add these functions into a shared util file and pass the local_objects_ as a parameter.
There was a problem hiding this comment.
Hmm, this might be hard because the LocalObjectInfo in object_manager is different from the LocalObjectInfo in local_object_manager, so there still needs to be four separate definitions, unless I add methods to LocalObjectInfos to compromise them together, which I don't really like. I decided to move LocalObjectsMatchedBy into a util function shared file, which I think is beneficial.
Sparks0219
left a comment
There was a problem hiding this comment.
LGTM, nice job! @edoakes could you PTAL and merge?
Sparks0219
left a comment
There was a problem hiding this comment.
I think it would be helpful to point out the failure model a bit more explicitly in the PR description.
1.) When the worker process dies (but the node remains alive), the raylet picks up the worker death relatively quickly via the broken IPC connection and sends the worker death up to the GCS, and GCS broadcasts it. Meanwhile, the long poll connection would be sitting the retryable grpc client queue waiting for either the connection to come back up or the GCS to get a worker death notification.
2.) On node death, similar story.
Hence all in all, there really shouldn't be a significant change in behavior.
| void LocalObjectManager::ReleaseFreedLocalObject(const ObjectID &object_id) { | ||
| // This is called for both primary and secondary copies. For secondary copies, they | ||
| // should just be queued up to be freed below. For primary copies, additional | ||
| // bookkeeping is needed to ensure there is no regression. |
There was a problem hiding this comment.
what does "no regression" mean?
There was a problem hiding this comment.
Good catch, bad comment. Changed it to a more suitable comment.
| } | ||
| } | ||
|
|
||
| std::vector<ObjectID> LocalObjectManager::GetLocalObjectsOwnedBy( |
| [&](const std::vector<ray::ObjectID> &object_ids) { | ||
| object_manager->FreeObjects(object_ids, | ||
| /*local_only=*/false); | ||
| /*local_only=*/true); |
There was a problem hiding this comment.
local_only flag will be removed in a future PR?
| for (const auto &id : object_manager_.GetLocalObjectsOwnedByOwnersOn(node_id)) { | ||
| ids.insert(id); | ||
| } | ||
| RAY_UNUSED(FreeLocalObjects(std::vector<ObjectID>(ids.begin(), ids.end()))); |
There was a problem hiding this comment.
why RAY_UNUSED? either handle the error if it's recoverable (at minimum a warning) or make it a RAY_CHECK
There was a problem hiding this comment.
Artifact of past implementation, no need for status anymore so changed to void, good catch.
| publisher_.get(), | ||
| subscriber_.get(), | ||
| /*is_node_dead=*/[](const NodeID &) { return false; }, | ||
| /*spread_free_local_objects=*/ |
There was a problem hiding this comment.
nit: "spread" is a bit misleading here as it indicates that we are sending the RPC across the cluster
I would call this something like free_objects_on_nodes_async
| void CoreWorker::SpreadFreeLocalObjects(const ObjectID &object_id, | ||
| const std::vector<NodeID> &locations) { |
There was a problem hiding this comment.
same comment on naming as above, something like FreeObjectOnNodesAsync
The "local" naming here is also a bit confusing -- it makes sense in the RPC to a specific raylet, but here we are actually freeing the object across non-local nodes.
There was a problem hiding this comment.
Makes sense, done
| if (!status.ok()) { | ||
| RAY_LOG(WARNING).WithField(object_id) | ||
| << "FreeLocalObjects RPC to node " << node_id | ||
| << " failed: " << status.ToString(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Does the RPC retry automatically? In what case would we ever get !status.ok() and how should we handle it?
There was a problem hiding this comment.
RPC does retry automatically as it is through RetryableGrpcClient. Thus, for any network related errors, this should automatically retry. I believe this status case will only happen when the Raylet dies. I looked at other implementations and they seem to just log a warning within the raylet client, which is what I ended up doing.
There was a problem hiding this comment.
Hm it would be good to have a specific status for that (node died error) and possibly use the statusset for this @Sparks0219. That way clients are forced to explicitly handle a subset of possible errors as applicable.
8f4c9cd to
fdf18e3
Compare
Signed-off-by: aaronscalene <aaron.li@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 9c25256. Configure here.
Signed-off-by: aaron.li <aaron.li@anyscale.com>

Motivation
Within this multi-step PR process, our goal is to change our current FreeObjects implementation to an owner driven process. Current design of FreeObjects involves spraying the entire cluster with FreeObjects RPCs. This introduces many caveats: bloating observability with FreeObjects RPC counts, sending excessive RPCs, and worse mental model from owner-driven reference counting. This is a continuation from #63117. Due to the race condition detailed in the testing section of the PR, we decided to merge both changes for a complete PR.
Our current design for FreeObjects involves the following steps:
Design
As noted above, the owner stores the locations of all copies of the objects, where
pinned_at_node_id_represents the primary copy location and thelocationsvector demonstrates all secondary object locations.We now move towards a new design FreeObjects. We will get rid of the pubsub between owner -> primary copy raylet. Instead:
However, this doesn't easily solve our issue since the pubsub also has an
owner_dead_callback, which fires when the owner has died. This will help clean up the primary object and ensure there are no memory leaks in this case. To continue supporting this functionality, if the owner dies, this information is propagated by the GCS, and thus each raylet just listens to the GCS events and updates their states accordingly.Originally, the design was that:
The new design:
1.) When the worker process dies (but the node remains alive), the raylet picks up the worker death relatively quickly via the broken IPC connection and sends the worker death up to the GCS, and GCS broadcasts it.
2.) On node death, similar story.
However, in old implementation, the long poll connection would be sitting the retryable grpc client queue waiting for either the connection to come back up or the GCS to get a worker death notification. Hence all in all, there really shouldn't be a significant change in behavior.
Description
This PR covers the first and second part of the FreeObjects change. The first part is strictly adding functionality for the owner to be in charge of freeing objects. We make the first step away from this design such that the owner calls a new FreeLocalObjects RPC to all primary and secondary copy locations when OnObjectsOutOfScopeOrFreed. The second part amends the functionality of the current owner dead pubsub callback to be moved from an eager pubsub death callback to a more passive listener callback.
Considerations
NodeManager RPC vs. ObjectManager RPC
For this PR, I considered either 1) Adding a new RPC to NodeManager or 2) Adding new route that calls into existing ObjectManager FreeObjects. Ultimately, I decided to go with 1). A design consideration here is that workers currently only call into raylets' NodeManager, so part of this is to keep with existing implementation. Another reason is that our new HandleFreeLocalObjects would be responsible for 2 things: 1) if primary copy, delete metadata from LocalObjectManager (manages spilled and primary copies) and 2) if any copy, queue up within FreeObjects. Thus, NodeManager currently has functionality that calls into LocalObjectManager. Furthermore, I find that directly interfacing with ObjectManager is not as intuitive, and creating a new path just for ObjectManager does not seem ideal.
Moving Around ReferenceCounter Variables
For this PR, I decided to follow similar formats in the codebase, and have all the lambdas within the constructor shifted to the end. In this case, they can be considered optional arguments and can be defaulted to null if not specified. This leads to easier testing implementations. A tricky part is to ensure these functions generate no-op if they are not specified.
Only Owner Frees
An important consideration is that now that we are sending from the worker side, only if the object is freed is owned by the current worker should the worker send RPC requests to the object locations. Otherwise, the worker should just keep quiet.
Dead Code Deleted in Future
For this PR, I kept FreeObjects's
local_only=falsecase still alive, even though this case is essentially never called again. In the future, I will delete this dead code.Testing Changes and ReferenceCounter Changes
For this PR, there were many testing changes that are needed because of this new path that is different from the first one. For now, many of these tests are just hardcoded to match correct behavior, but in the future some unnecessary tests might be deleted or shuffled around to match the new implementation.
Pass Over All Objects
This new design introduces a pass over all objects within the current node every announcement. However, failed nodes and worker announcements should not be too common and if the object is not owned by the failed worker / node it would be a no-op; thus this should not lead to any latency issues.
Owner is Less Eager
This new design will be a less eager free, especially for the primary copy of the node. This is definitely an important part to consider, however since it is similar a pubsub for node information, the latency should not be that big of a deal. Furthermore, the pubsub interface already keeps track of network failures etc. to some extent, and if there are further failures / GCS failures there are probably worse problems to deal with.
LocalObjectManager local_objects_ vs. ObjectManager local_objects_
An important distinction is needed between both classes' local objects, where we need to aggregate and free any objects within both classes. LocalObjectManager in this case keeps track of primary copies as well as spilled copies. ObjectManager keeps track of anything in plasma such as primary copies / secondary copies. Thus, we need to get the union of both objects to get the objects that we want to free.
Testing
Ran python tests and C++ tests adjacent to these changes to ensure no regressions. Future e2e testing in #63181
We will fix this race condition in the immediate next PR, which uses the GCS to broadcast deaths and have the raylets free accordingly.